001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Funsho David 018 * 019 */ 020 021package org.nuxeo.directory.mongodb; 022 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.function.Consumer; 031import java.util.stream.Collectors; 032import java.util.stream.StreamSupport; 033 034import org.bson.Document; 035import org.bson.conversions.Bson; 036import org.nuxeo.ecm.core.schema.types.SchemaImpl; 037import org.nuxeo.ecm.core.schema.types.primitives.StringType; 038import org.nuxeo.ecm.directory.AbstractReference; 039import org.nuxeo.ecm.directory.BaseDirectoryDescriptor; 040import org.nuxeo.ecm.directory.DirectoryCSVLoader; 041import org.nuxeo.ecm.directory.DirectoryException; 042import org.nuxeo.ecm.directory.Reference; 043 044import com.mongodb.BasicDBList; 045import com.mongodb.BasicDBObject; 046import com.mongodb.MongoWriteException; 047import com.mongodb.client.FindIterable; 048import com.mongodb.client.MongoCollection; 049import com.mongodb.client.result.DeleteResult; 050import org.nuxeo.ecm.directory.ReferenceDescriptor; 051import org.nuxeo.ecm.directory.Session; 052 053/** 054 * MongoDB implementation of a {@link Reference} 055 * 056 * @since 9.1 057 */ 058public class MongoDBReference extends AbstractReference { 059 060 protected String collection; 061 062 protected String sourceField; 063 064 protected String targetField; 065 066 protected String dataFileName; 067 068 /** 069 * @since 9.2 070 */ 071 public MongoDBReference(String field, String directory, String collection, String sourceField, String targetField, 072 String dataFileName) { 073 super(field, directory); 074 this.collection = collection; 075 this.sourceField = sourceField; 076 this.targetField = targetField; 077 this.dataFileName = dataFileName; 078 } 079 080 /** 081 * @since 9.2 082 */ 083 public MongoDBReference(MongoDBReferenceDescriptor descriptor) { 084 this(descriptor.getFieldName(), descriptor.getTargetDirectoryName(), descriptor.getCollection(), 085 descriptor.getSourceField(), descriptor.getTargetField(), descriptor.getDataFileName()); 086 } 087 088 /** 089 * @since 9.2 090 */ 091 public MongoDBReference(ReferenceDescriptor descriptor) { 092 this(descriptor.getFieldName(), descriptor.getDirectory(), descriptor.getReferenceName(), 093 descriptor.getSource(), descriptor.getTarget(), descriptor.getDataFileName()); 094 } 095 096 @Override 097 public void addLinks(String sourceId, List<String> targetIds) { 098 try (MongoDBSession session = getMongoDBSession()) { 099 addLinks(sourceId, targetIds, session); 100 } 101 } 102 103 @Override 104 public void addLinks(String sourceId, List<String> targetIds, Session session) { 105 if (targetIds == null || targetIds.isEmpty()) { 106 return; 107 } 108 try { 109 MongoDBSession mongoSession = (MongoDBSession) session; 110 MongoCollection<Document> coll = getCollection(mongoSession); 111 List<Document> newDocs = targetIds.stream() 112 .map(targetId -> buildDoc(sourceId, targetId)) 113 .filter(doc -> coll.countDocuments(doc) == 0) 114 .collect(Collectors.toList()); 115 if (!newDocs.isEmpty()) { 116 coll.insertMany(newDocs); 117 } 118 } catch (MongoWriteException e) { 119 throw new DirectoryException(e); 120 } 121 } 122 123 @Override 124 public void addLinks(List<String> sourceIds, String targetId, Session session) { 125 MongoDBSession mongodbSession = (MongoDBSession) session; 126 MongoCollection<Document> coll = getCollection(mongodbSession); 127 List<Document> newDocs = sourceIds.stream() 128 .map(sourceId -> buildDoc(sourceId, targetId)) 129 .filter(doc -> coll.countDocuments(doc) == 0) 130 .collect(Collectors.toList()); 131 if (!newDocs.isEmpty()) { 132 coll.insertMany(newDocs); 133 } 134 } 135 136 @Override 137 public void addLinks(List<String> sourceIds, String targetId) { 138 if (sourceIds == null || sourceIds.isEmpty()) { 139 return; 140 } 141 try (MongoDBSession session = getMongoDBSession()) { 142 addLinks(sourceIds, targetId, session); 143 } catch (MongoWriteException e) { 144 throw new DirectoryException(e); 145 } 146 } 147 148 @Override 149 public void removeLinksForSource(String sourceId) { 150 try (MongoDBSession session = getMongoDBSession()) { 151 removeLinksForSource(sourceId, session); 152 } 153 } 154 155 @Override 156 public void removeLinksForSource(String sourceId, Session session) { 157 removeLinksFor(sourceField, sourceId, (MongoDBSession) session); 158 } 159 160 @Override 161 public void removeLinksForTarget(String targetId) { 162 try (MongoDBSession session = getMongoDBSession()) { 163 removeLinksFor(targetField, targetId, session); 164 } 165 } 166 167 @Override 168 public void removeLinksForTarget(String targetId, Session session) { 169 removeLinksFor(targetField, targetId, (MongoDBSession) session); 170 } 171 172 private void removeLinksFor(String field, String value, MongoDBSession session) { 173 try { 174 DeleteResult result = getCollection(session).deleteMany( 175 MongoDBSerializationHelper.fieldMapToBson(field, value)); 176 if (!result.wasAcknowledged()) { 177 throw new DirectoryException( 178 "Error while deleting the entry, the request has not been acknowledged by the server"); 179 } 180 } catch (MongoWriteException e) { 181 throw new DirectoryException(e); 182 } 183 } 184 185 @Override 186 public List<String> getTargetIdsForSource(String sourceId) { 187 try (MongoDBSession session = getMongoDBSession()) { 188 return getIdsFor(sourceField, sourceId, targetField, session); 189 } 190 } 191 192 /** 193 * Retrieves all target ids associated to the given source id 194 * 195 * @param sourceId the source id 196 * @param session the mongoDB session 197 * @return the list of target ids 198 */ 199 public List<String> getTargetIdsForSource(String sourceId, MongoDBSession session) { 200 return getIdsFor(sourceField, sourceId, targetField, session); 201 } 202 203 @Override 204 public List<String> getSourceIdsForTarget(String targetId) { 205 try (MongoDBSession session = getMongoDBSession()) { 206 return getIdsFor(targetField, targetId, sourceField, session); 207 } 208 } 209 210 private List<String> getIdsFor(String queryField, String value, String resultField, MongoDBSession session) { 211 FindIterable<Document> docs = getCollection(session).find( 212 MongoDBSerializationHelper.fieldMapToBson(queryField, value)); 213 return StreamSupport.stream(docs.spliterator(), false) 214 .map(doc -> doc.getString(resultField)) 215 .collect(Collectors.toList()); 216 } 217 218 @Override 219 public void setTargetIdsForSource(String sourceId, List<String> targetIds) { 220 try (MongoDBSession session = getMongoDBSession()) { 221 setTargetIdsForSource(sourceId, targetIds, session); 222 } 223 } 224 225 @Override 226 public void setTargetIdsForSource(String sourceId, List<String> targetIds, Session session) { 227 setIdsFor(sourceField, sourceId, targetField, targetIds, (MongoDBSession) session); 228 } 229 230 @Override 231 public void setSourceIdsForTarget(String targetId, List<String> sourceIds) { 232 try (MongoDBSession session = getMongoDBSession()) { 233 setIdsFor(targetField, targetId, sourceField, sourceIds, session); 234 } 235 } 236 237 @Override 238 public void setSourceIdsForTarget(String targetId, List<String> sourceIds, Session session) { 239 setIdsFor(targetField, targetId, sourceField, sourceIds, (MongoDBSession) session); 240 } 241 242 private void setIdsFor(String field, String value, String fieldToUpdate, List<String> ids, MongoDBSession session) { 243 Set<String> idsToAdd = new HashSet<>(); 244 if (ids != null) { 245 idsToAdd.addAll(ids); 246 } 247 List<String> idsToDelete = new ArrayList<>(); 248 249 List<String> existingIds = getIdsFor(field, value, fieldToUpdate, session); 250 for (String id : existingIds) { 251 if (!idsToAdd.remove(id)) { 252 idsToDelete.add(id); 253 } 254 } 255 256 if (!idsToDelete.isEmpty()) { 257 BasicDBList list = new BasicDBList(); 258 if (sourceField.equals(field)) { 259 list.addAll(idsToDelete.stream().map(id -> buildDoc(value, id)).collect(Collectors.toList())); 260 } else { 261 list.addAll(idsToDelete.stream().map(id -> buildDoc(id, value)).collect(Collectors.toList())); 262 } 263 Bson deleteDoc = new BasicDBObject("$or", list); 264 getCollection(session).deleteMany(deleteDoc); 265 } 266 267 if (!idsToAdd.isEmpty()) { 268 List<Document> list; 269 if (sourceField.equals(field)) { 270 list = idsToAdd.stream().map(id -> buildDoc(value, id)).collect(Collectors.toList()); 271 } else { 272 list = idsToAdd.stream().map(id -> buildDoc(id, value)).collect(Collectors.toList()); 273 } 274 getCollection(session).insertMany(list); 275 } 276 } 277 278 private Document buildDoc(String sourceId, String targetId) { 279 Map<String, Object> fieldMap = new HashMap<>(); 280 fieldMap.put(sourceField, sourceId); 281 fieldMap.put(targetField, targetId); 282 return MongoDBSerializationHelper.fieldMapToBson(fieldMap); 283 } 284 285 protected void initialize(MongoDBSession session) { 286 if (dataFileName != null) { 287 // fake schema for DirectoryCSVLoader.loadData 288 SchemaImpl schema = new SchemaImpl(collection, null); 289 schema.addField(sourceField, StringType.INSTANCE, null, 0, Collections.emptySet()); 290 schema.addField(targetField, StringType.INSTANCE, null, 0, Collections.emptySet()); 291 Consumer<Map<String, Object>> loader = map -> { 292 Document doc = MongoDBSerializationHelper.fieldMapToBson(map); 293 MongoCollection<Document> coll = getCollection(session); 294 if (coll.countDocuments(doc) == 0) { 295 coll.insertOne(doc); 296 } 297 }; 298 DirectoryCSVLoader.loadData(dataFileName, BaseDirectoryDescriptor.DEFAULT_DATA_FILE_CHARACTER_SEPARATOR, 299 schema, loader); 300 } 301 } 302 303 protected MongoDBSession getMongoDBSession() { 304 return (MongoDBSession) getSourceDirectory().getSession(); 305 } 306 307 protected MongoCollection<Document> getCollection(MongoDBSession session) { 308 return session.getDirectory().database.getCollection(collection); 309 } 310 311}