001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Funsho David 018 * 019 */ 020 021package org.nuxeo.directory.mongodb; 022 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.function.Consumer; 031import java.util.stream.Collectors; 032import java.util.stream.StreamSupport; 033 034import org.bson.Document; 035import org.bson.conversions.Bson; 036import org.nuxeo.ecm.core.schema.types.SchemaImpl; 037import org.nuxeo.ecm.core.schema.types.primitives.StringType; 038import org.nuxeo.ecm.directory.AbstractReference; 039import org.nuxeo.ecm.directory.BaseDirectoryDescriptor; 040import org.nuxeo.ecm.directory.DirectoryCSVLoader; 041import org.nuxeo.ecm.directory.DirectoryException; 042import org.nuxeo.ecm.directory.Reference; 043 044import com.mongodb.BasicDBList; 045import com.mongodb.BasicDBObject; 046import com.mongodb.MongoWriteException; 047import com.mongodb.client.FindIterable; 048import com.mongodb.client.MongoCollection; 049import com.mongodb.client.result.DeleteResult; 050import org.nuxeo.ecm.directory.ReferenceDescriptor; 051import org.nuxeo.ecm.directory.Session; 052 053/** 054 * MongoDB implementation of a {@link Reference} 055 * 056 * @since 9.1 057 */ 058public class MongoDBReference extends AbstractReference { 059 060 protected String collection; 061 062 protected String sourceField; 063 064 protected String targetField; 065 066 protected String dataFileName; 067 068 private boolean initialized; 069 070 /** 071 * @since 9.2 072 */ 073 public MongoDBReference(String field, String directory, String collection, String sourceField, String targetField, 074 String dataFileName) { 075 super(field, directory); 076 this.collection = collection; 077 this.sourceField = sourceField; 078 this.targetField = targetField; 079 this.dataFileName = dataFileName; 080 } 081 082 /** 083 * @since 9.2 084 */ 085 public MongoDBReference(MongoDBReferenceDescriptor descriptor) { 086 this(descriptor.getFieldName(), descriptor.getTargetDirectoryName(), descriptor.getCollection(), 087 descriptor.getSourceField(), descriptor.getTargetField(), descriptor.getDataFileName()); 088 } 089 090 /** 091 * @since 9.2 092 */ 093 public MongoDBReference(ReferenceDescriptor descriptor) { 094 this(descriptor.getFieldName(), descriptor.getDirectory(), descriptor.getReferenceName(), 095 descriptor.getSource(), descriptor.getTarget(), descriptor.getDataFileName()); 096 } 097 098 @Override 099 public void addLinks(String sourceId, List<String> targetIds) throws DirectoryException { 100 try (MongoDBSession session = getMongoDBSession()) { 101 addLinks(sourceId, targetIds, session); 102 } 103 } 104 105 @Override 106 public void addLinks(String sourceId, List<String> targetIds, Session session) throws DirectoryException { 107 MongoDBSession mongoSession = (MongoDBSession) session; 108 if (!initialized) { 109 if (dataFileName != null) { 110 initializeSession(mongoSession); 111 } 112 initialized = true; 113 } 114 if (targetIds == null || targetIds.isEmpty()) { 115 return; 116 } 117 try { 118 MongoCollection<Document> coll = mongoSession.getCollection(collection); 119 List<Document> newDocs = targetIds.stream() 120 .map(targetId -> buildDoc(sourceId, targetId)) 121 .filter(doc -> coll.count(doc) == 0) 122 .collect(Collectors.toList()); 123 if (!newDocs.isEmpty()) { 124 coll.insertMany(newDocs); 125 } 126 } catch (MongoWriteException e) { 127 throw new DirectoryException(e); 128 } 129 } 130 131 @Override 132 public void addLinks(List<String> sourceIds, String targetId, Session session) throws DirectoryException { 133 MongoDBSession mongodbSession = (MongoDBSession) session; 134 MongoCollection<Document> coll = mongodbSession.getCollection(collection); 135 List<Document> newDocs = sourceIds.stream() 136 .map(sourceId -> buildDoc(sourceId, targetId)) 137 .filter(doc -> coll.count(doc) == 0) 138 .collect(Collectors.toList()); 139 if (!newDocs.isEmpty()) { 140 coll.insertMany(newDocs); 141 } 142 } 143 144 @Override 145 public void addLinks(List<String> sourceIds, String targetId) throws DirectoryException { 146 if (sourceIds == null || sourceIds.isEmpty()) { 147 return; 148 } 149 try (MongoDBSession session = getMongoDBSession()) { 150 addLinks(sourceIds, targetId, session); 151 } catch (MongoWriteException e) { 152 throw new DirectoryException(e); 153 } 154 } 155 156 @Override 157 public void removeLinksForSource(String sourceId) throws DirectoryException { 158 try (MongoDBSession session = getMongoDBSession()) { 159 removeLinksForSource(sourceId, session); 160 } 161 } 162 163 @Override 164 public void removeLinksForSource(String sourceId, Session session) { 165 removeLinksFor(sourceField, sourceId, (MongoDBSession) session); 166 } 167 168 @Override 169 public void removeLinksForTarget(String targetId) throws DirectoryException { 170 try (MongoDBSession session = getMongoDBSession()) { 171 removeLinksFor(targetField, targetId, session); 172 } 173 } 174 175 @Override 176 public void removeLinksForTarget(String targetId, Session session) throws DirectoryException { 177 removeLinksFor(targetField, targetId, (MongoDBSession) session); 178 } 179 180 private void removeLinksFor(String field, String value, MongoDBSession session) { 181 try { 182 DeleteResult result = session.getCollection(collection) 183 .deleteMany(MongoDBSerializationHelper.fieldMapToBson(field, value)); 184 if (!result.wasAcknowledged()) { 185 throw new DirectoryException( 186 "Error while deleting the entry, the request has not been acknowledged by the server"); 187 } 188 } catch (MongoWriteException e) { 189 throw new DirectoryException(e); 190 } 191 } 192 193 @Override 194 public List<String> getTargetIdsForSource(String sourceId) throws DirectoryException { 195 try (MongoDBSession session = getMongoDBSession()) { 196 return getIdsFor(sourceField, sourceId, targetField, session); 197 } 198 } 199 200 /** 201 * Retrieves all target ids associated to the given source id 202 * 203 * @param sourceId the source id 204 * @param session the mongoDB session 205 * @return the list of target ids 206 * @throws DirectoryException 207 */ 208 public List<String> getTargetIdsForSource(String sourceId, MongoDBSession session) throws DirectoryException { 209 return getIdsFor(sourceField, sourceId, targetField, session); 210 } 211 212 @Override 213 public List<String> getSourceIdsForTarget(String targetId) throws DirectoryException { 214 try (MongoDBSession session = getMongoDBSession()) { 215 return getIdsFor(targetField, targetId, sourceField, session); 216 } 217 } 218 219 private List<String> getIdsFor(String queryField, String value, String resultField, MongoDBSession session) { 220 FindIterable<Document> docs = session.getCollection(collection) 221 .find(MongoDBSerializationHelper.fieldMapToBson(queryField, value)); 222 return StreamSupport.stream(docs.spliterator(), false) 223 .map(doc -> doc.getString(resultField)) 224 .collect(Collectors.toList()); 225 } 226 227 @Override 228 public void setTargetIdsForSource(String sourceId, List<String> targetIds) throws DirectoryException { 229 try (MongoDBSession session = getMongoDBSession()) { 230 setTargetIdsForSource(sourceId, targetIds, session); 231 } 232 } 233 234 @Override 235 public void setTargetIdsForSource(String sourceId, List<String> targetIds, Session session) 236 throws DirectoryException { 237 setIdsFor(sourceField, sourceId, targetField, targetIds, (MongoDBSession) session); 238 } 239 240 @Override 241 public void setSourceIdsForTarget(String targetId, List<String> sourceIds) throws DirectoryException { 242 try (MongoDBSession session = getMongoDBSession()) { 243 setIdsFor(targetField, targetId, sourceField, sourceIds, session); 244 } 245 } 246 247 @Override 248 public void setSourceIdsForTarget(String targetId, List<String> sourceIds, Session session) 249 throws DirectoryException { 250 setIdsFor(targetField, targetId, sourceField, sourceIds, (MongoDBSession) session); 251 } 252 253 private void setIdsFor(String field, String value, String fieldToUpdate, List<String> ids, MongoDBSession session) { 254 Set<String> idsToAdd = new HashSet<>(); 255 if (ids != null) { 256 idsToAdd.addAll(ids); 257 } 258 List<String> idsToDelete = new ArrayList<>(); 259 260 List<String> existingIds = getIdsFor(field, value, fieldToUpdate, session); 261 for (String id : existingIds) { 262 if (!idsToAdd.remove(id)) { 263 idsToDelete.add(id); 264 } 265 } 266 267 if (!idsToDelete.isEmpty()) { 268 BasicDBList list = new BasicDBList(); 269 if (sourceField.equals(field)) { 270 list.addAll(idsToDelete.stream().map(id -> buildDoc(value, id)).collect(Collectors.toList())); 271 } else { 272 list.addAll(idsToDelete.stream().map(id -> buildDoc(id, value)).collect(Collectors.toList())); 273 } 274 Bson deleteDoc = new BasicDBObject("$or", list); 275 session.getCollection(collection).deleteMany(deleteDoc); 276 } 277 278 if (!idsToAdd.isEmpty()) { 279 List<Document> list; 280 if (sourceField.equals(field)) { 281 list = idsToAdd.stream().map(id -> buildDoc(value, id)).collect(Collectors.toList()); 282 } else { 283 list = idsToAdd.stream().map(id -> buildDoc(id, value)).collect(Collectors.toList()); 284 } 285 session.getCollection(collection).insertMany(list); 286 } 287 } 288 289 private Document buildDoc(String sourceId, String targetId) { 290 Map<String, Object> fieldMap = new HashMap<>(); 291 fieldMap.put(sourceField, sourceId); 292 fieldMap.put(targetField, targetId); 293 return MongoDBSerializationHelper.fieldMapToBson(fieldMap); 294 } 295 296 protected void initializeSession(MongoDBSession session) { 297 // fake schema for DirectoryCSVLoader.loadData 298 SchemaImpl schema = new SchemaImpl(collection, null); 299 schema.addField(sourceField, StringType.INSTANCE, null, 0, Collections.emptySet()); 300 schema.addField(targetField, StringType.INSTANCE, null, 0, Collections.emptySet()); 301 302 Consumer<Map<String, Object>> loader = map -> { 303 Document doc = MongoDBSerializationHelper.fieldMapToBson(map); 304 MongoCollection<Document> coll = session.getCollection(collection); 305 if (coll.count(doc) == 0) { 306 coll.insertOne(doc); 307 } 308 }; 309 DirectoryCSVLoader.loadData(dataFileName, BaseDirectoryDescriptor.DEFAULT_DATA_FILE_CHARACTER_SEPARATOR, schema, 310 loader); 311 } 312 313 protected MongoDBSession getMongoDBSession() throws DirectoryException { 314 if (!initialized) { 315 if (dataFileName != null) { 316 try (MongoDBSession session = (MongoDBSession) getSourceDirectory().getSession()) { 317 initializeSession(session); 318 } 319 } 320 initialized = true; 321 } 322 return (MongoDBSession) getSourceDirectory().getSession(); 323 } 324 325}