001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Funsho David 018 * 019 */ 020 021package org.nuxeo.directory.mongodb; 022 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.function.Consumer; 031import java.util.stream.Collectors; 032import java.util.stream.StreamSupport; 033 034import org.bson.Document; 035import org.bson.conversions.Bson; 036import org.nuxeo.ecm.core.schema.types.SchemaImpl; 037import org.nuxeo.ecm.core.schema.types.primitives.StringType; 038import org.nuxeo.ecm.directory.AbstractReference; 039import org.nuxeo.ecm.directory.BaseDirectoryDescriptor; 040import org.nuxeo.ecm.directory.DirectoryCSVLoader; 041import org.nuxeo.ecm.directory.DirectoryException; 042import org.nuxeo.ecm.directory.Reference; 043 044import com.mongodb.BasicDBList; 045import com.mongodb.BasicDBObject; 046import com.mongodb.MongoWriteException; 047import com.mongodb.client.FindIterable; 048import com.mongodb.client.MongoCollection; 049import com.mongodb.client.result.DeleteResult; 050import org.nuxeo.ecm.directory.ReferenceDescriptor; 051import org.nuxeo.ecm.directory.Session; 052 053/** 054 * MongoDB implementation of a {@link Reference} 055 * 056 * @since 9.1 057 */ 058public class MongoDBReference extends AbstractReference { 059 060 protected String collection; 061 062 protected String sourceField; 063 064 protected String targetField; 065 066 protected String dataFileName; 067 068 private boolean initialized; 069 070 /** 071 * @since 9.2 072 */ 073 public MongoDBReference(String field, String directory, String collection, String sourceField, String targetField, 074 String dataFileName) { 075 super(field, directory); 076 this.collection = collection; 077 this.sourceField = sourceField; 078 this.targetField = targetField; 079 this.dataFileName = dataFileName; 080 } 081 082 /** 083 * @since 9.2 084 */ 085 public MongoDBReference(MongoDBReferenceDescriptor descriptor) { 086 this(descriptor.getFieldName(), descriptor.getTargetDirectoryName(), descriptor.getCollection(), 087 descriptor.getSourceField(), descriptor.getTargetField(), descriptor.getDataFileName()); 088 } 089 090 /** 091 * @since 9.2 092 */ 093 public MongoDBReference(ReferenceDescriptor descriptor) { 094 this(descriptor.getFieldName(), descriptor.getDirectory(), descriptor.getReferenceName(), 095 descriptor.getSource(), descriptor.getTarget(), descriptor.getDataFileName()); 096 } 097 098 @Override 099 public void addLinks(String sourceId, List<String> targetIds) throws DirectoryException { 100 try (MongoDBSession session = getMongoDBSession()) { 101 addLinks(sourceId, targetIds, session); 102 } 103 } 104 105 @Override 106 public void addLinks(String sourceId, List<String> targetIds, Session session) throws DirectoryException { 107 MongoDBSession mongoSession = (MongoDBSession) session; 108 if (!initialized) { 109 if (dataFileName != null) { 110 initializeSession(mongoSession); 111 } 112 initialized = true; 113 } 114 if (targetIds == null || targetIds.isEmpty()) { 115 return; 116 } 117 try { 118 MongoCollection<Document> coll = mongoSession.getCollection(collection); 119 List<Document> newDocs = targetIds.stream() 120 .map(targetId -> buildDoc(sourceId, targetId)) 121 .filter(doc -> coll.count(doc) == 0) 122 .collect(Collectors.toList()); 123 coll.insertMany(newDocs); 124 } catch (MongoWriteException e) { 125 throw new DirectoryException(e); 126 } 127 } 128 129 @Override 130 public void addLinks(List<String> sourceIds, String targetId, Session session) throws DirectoryException { 131 MongoDBSession mongodbSession = (MongoDBSession) session; 132 MongoCollection<Document> coll = mongodbSession.getCollection(collection); 133 List<Document> newDocs = sourceIds.stream() 134 .map(sourceId -> buildDoc(sourceId, targetId)) 135 .filter(doc -> coll.count(doc) == 0) 136 .collect(Collectors.toList()); 137 if (!newDocs.isEmpty()) { 138 coll.insertMany(newDocs); 139 } 140 } 141 142 @Override 143 public void addLinks(List<String> sourceIds, String targetId) throws DirectoryException { 144 if (sourceIds == null || sourceIds.isEmpty()) { 145 return; 146 } 147 try (MongoDBSession session = getMongoDBSession()) { 148 addLinks(sourceIds, targetId, session); 149 } catch (MongoWriteException e) { 150 throw new DirectoryException(e); 151 } 152 } 153 154 @Override 155 public void removeLinksForSource(String sourceId) throws DirectoryException { 156 try (MongoDBSession session = getMongoDBSession()) { 157 removeLinksForSource(sourceId, session); 158 } 159 } 160 161 @Override 162 public void removeLinksForSource(String sourceId, Session session) { 163 removeLinksFor(sourceField, sourceId, (MongoDBSession) session); 164 } 165 166 @Override 167 public void removeLinksForTarget(String targetId) throws DirectoryException { 168 try (MongoDBSession session = getMongoDBSession()) { 169 removeLinksFor(targetField, targetId, session); 170 } 171 } 172 173 @Override 174 public void removeLinksForTarget(String targetId, Session session) throws DirectoryException { 175 removeLinksFor(targetField, targetId, (MongoDBSession) session); 176 } 177 178 private void removeLinksFor(String field, String value, MongoDBSession session) { 179 try { 180 DeleteResult result = session.getCollection(collection) 181 .deleteMany(MongoDBSerializationHelper.fieldMapToBson(field, value)); 182 if (!result.wasAcknowledged()) { 183 throw new DirectoryException( 184 "Error while deleting the entry, the request has not been acknowledged by the server"); 185 } 186 } catch (MongoWriteException e) { 187 throw new DirectoryException(e); 188 } 189 } 190 191 @Override 192 public List<String> getTargetIdsForSource(String sourceId) throws DirectoryException { 193 try (MongoDBSession session = getMongoDBSession()) { 194 return getIdsFor(sourceField, sourceId, targetField, session); 195 } 196 } 197 198 /** 199 * Retrieves all target ids associated to the given source id 200 * 201 * @param sourceId the source id 202 * @param session the mongoDB session 203 * @return the list of target ids 204 * @throws DirectoryException 205 */ 206 public List<String> getTargetIdsForSource(String sourceId, MongoDBSession session) throws DirectoryException { 207 return getIdsFor(sourceField, sourceId, targetField, session); 208 } 209 210 @Override 211 public List<String> getSourceIdsForTarget(String targetId) throws DirectoryException { 212 try (MongoDBSession session = getMongoDBSession()) { 213 return getIdsFor(targetField, targetId, sourceField, session); 214 } 215 } 216 217 private List<String> getIdsFor(String queryField, String value, String resultField, MongoDBSession session) { 218 FindIterable<Document> docs = session.getCollection(collection) 219 .find(MongoDBSerializationHelper.fieldMapToBson(queryField, value)); 220 return StreamSupport.stream(docs.spliterator(), false) 221 .map(doc -> doc.getString(resultField)) 222 .collect(Collectors.toList()); 223 } 224 225 @Override 226 public void setTargetIdsForSource(String sourceId, List<String> targetIds) throws DirectoryException { 227 try (MongoDBSession session = getMongoDBSession()) { 228 setTargetIdsForSource(sourceId, targetIds, session); 229 } 230 } 231 232 @Override 233 public void setTargetIdsForSource(String sourceId, List<String> targetIds, Session session) 234 throws DirectoryException { 235 setIdsFor(sourceField, sourceId, targetField, targetIds, (MongoDBSession) session); 236 } 237 238 @Override 239 public void setSourceIdsForTarget(String targetId, List<String> sourceIds) throws DirectoryException { 240 try (MongoDBSession session = getMongoDBSession()) { 241 setIdsFor(targetField, targetId, sourceField, sourceIds, session); 242 } 243 } 244 245 @Override 246 public void setSourceIdsForTarget(String targetId, List<String> sourceIds, Session session) 247 throws DirectoryException { 248 setIdsFor(targetField, targetId, sourceField, sourceIds, (MongoDBSession) session); 249 } 250 251 private void setIdsFor(String field, String value, String fieldToUpdate, List<String> ids, MongoDBSession session) { 252 Set<String> idsToAdd = new HashSet<>(); 253 if (ids != null) { 254 idsToAdd.addAll(ids); 255 } 256 List<String> idsToDelete = new ArrayList<>(); 257 258 List<String> existingIds = getIdsFor(field, value, fieldToUpdate, session); 259 for (String id : existingIds) { 260 if (!idsToAdd.remove(id)) { 261 idsToDelete.add(id); 262 } 263 } 264 265 if (!idsToDelete.isEmpty()) { 266 BasicDBList list = new BasicDBList(); 267 if (sourceField.equals(field)) { 268 list.addAll(idsToDelete.stream().map(id -> buildDoc(value, id)).collect(Collectors.toList())); 269 } else { 270 list.addAll(idsToDelete.stream().map(id -> buildDoc(id, value)).collect(Collectors.toList())); 271 } 272 Bson deleteDoc = new BasicDBObject("$or", list); 273 session.getCollection(collection).deleteMany(deleteDoc); 274 } 275 276 if (!idsToAdd.isEmpty()) { 277 List<Document> list; 278 if (sourceField.equals(field)) { 279 list = idsToAdd.stream().map(id -> buildDoc(value, id)).collect(Collectors.toList()); 280 } else { 281 list = idsToAdd.stream().map(id -> buildDoc(id, value)).collect(Collectors.toList()); 282 } 283 session.getCollection(collection).insertMany(list); 284 } 285 } 286 287 private Document buildDoc(String sourceId, String targetId) { 288 Map<String, Object> fieldMap = new HashMap<>(); 289 fieldMap.put(sourceField, sourceId); 290 fieldMap.put(targetField, targetId); 291 return MongoDBSerializationHelper.fieldMapToBson(fieldMap); 292 } 293 294 protected void initializeSession(MongoDBSession session) { 295 // fake schema for DirectoryCSVLoader.loadData 296 SchemaImpl schema = new SchemaImpl(collection, null); 297 schema.addField(sourceField, StringType.INSTANCE, null, 0, Collections.emptySet()); 298 schema.addField(targetField, StringType.INSTANCE, null, 0, Collections.emptySet()); 299 300 Consumer<Map<String, Object>> loader = map -> { 301 Document doc = MongoDBSerializationHelper.fieldMapToBson(map); 302 MongoCollection<Document> coll = session.getCollection(collection); 303 if (coll.count(doc) == 0) { 304 coll.insertOne(doc); 305 } 306 }; 307 DirectoryCSVLoader.loadData(dataFileName, BaseDirectoryDescriptor.DEFAULT_DATA_FILE_CHARACTER_SEPARATOR, schema, 308 loader); 309 } 310 311 protected MongoDBSession getMongoDBSession() throws DirectoryException { 312 if (!initialized) { 313 if (dataFileName != null) { 314 try (MongoDBSession session = (MongoDBSession) getSourceDirectory().getSession()) { 315 initializeSession(session); 316 } 317 } 318 initialized = true; 319 } 320 return (MongoDBSession) getSourceDirectory().getSession(); 321 } 322 323}