001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Funsho David 018 * 019 */ 020 021package org.nuxeo.directory.mongodb; 022 023import com.mongodb.BasicDBList; 024import com.mongodb.BasicDBObject; 025import com.mongodb.MongoWriteException; 026import com.mongodb.client.FindIterable; 027import com.mongodb.client.result.DeleteResult; 028import org.bson.Document; 029import org.bson.conversions.Bson; 030import org.nuxeo.common.xmap.annotation.XNode; 031import org.nuxeo.common.xmap.annotation.XObject; 032import org.nuxeo.ecm.core.schema.types.SchemaImpl; 033import org.nuxeo.ecm.core.schema.types.primitives.StringType; 034import org.nuxeo.ecm.directory.AbstractReference; 035import org.nuxeo.ecm.directory.BaseDirectoryDescriptor; 036import org.nuxeo.ecm.directory.DirectoryCSVLoader; 037import org.nuxeo.ecm.directory.DirectoryException; 038import org.nuxeo.ecm.directory.Reference; 039import org.nuxeo.mongodb.core.MongoDBSerializationHelper; 040 041import java.util.ArrayList; 042import java.util.Collections; 043import java.util.HashMap; 044import java.util.HashSet; 045import java.util.List; 046import java.util.Map; 047import java.util.Set; 048import java.util.stream.Collectors; 049import java.util.stream.StreamSupport; 050 051/** 052 * MongoDB implementation of a {@link Reference} 053 * 054 * @since 9.1 055 */ 056@XObject("reference") 057public class MongoDBReference extends AbstractReference implements Cloneable { 058 059 @XNode("@collection") 060 protected String collection; 061 062 @XNode("@sourceField") 063 protected String sourceField; 064 065 @XNode("@targetField") 066 protected String targetField; 067 068 @XNode("@dataFile") 069 protected String dataFileName; 070 071 private boolean initialized; 072 073 @XNode("@field") 074 public void setFieldName(String fieldName) { 075 this.fieldName = fieldName; 076 } 077 078 @Override 079 @XNode("@directory") 080 public void setTargetDirectoryName(String targetDirectoryName) { 081 this.targetDirectoryName = targetDirectoryName; 082 } 083 084 @Override 085 public void addLinks(String sourceId, List<String> targetIds) throws DirectoryException { 086 if (targetIds == null) { 087 return; 088 } 089 try (MongoDBSession session = getMongoDBSession()) { 090 addLinks(sourceId, targetIds, session); 091 } 092 } 093 094 /** 095 * Adds the links between the source id and the target ids 096 * 097 * @param sourceId the source id 098 * @param targetIds the target ids 099 * @param session the mongoDB session 100 * @throws DirectoryException 101 */ 102 public void addLinks(String sourceId, List<String> targetIds, MongoDBSession session) throws DirectoryException { 103 if (targetIds == null) { 104 return; 105 } 106 try { 107 List<Document> newDocs = targetIds.stream() 108 .map(targetId -> buildDoc(sourceId, targetId)) 109 .filter(doc -> session.getCollection(collection).count(doc) == 0) 110 .collect(Collectors.toList()); 111 session.getCollection(collection).insertMany(newDocs); 112 } catch (MongoWriteException e) { 113 throw new DirectoryException(e); 114 } 115 } 116 117 @Override 118 public void addLinks(List<String> sourceIds, String targetId) throws DirectoryException { 119 if (sourceIds == null) { 120 return; 121 } 122 try (MongoDBSession session = getMongoDBSession()) { 123 List<Document> newDocs = sourceIds.stream() 124 .map(sourceId -> buildDoc(sourceId, targetId)) 125 .filter(doc -> session.getCollection(collection).count(doc) == 0) 126 .collect(Collectors.toList()); 127 session.getCollection(collection).insertMany(newDocs); 128 } catch (MongoWriteException e) { 129 throw new DirectoryException(e); 130 } 131 } 132 133 @Override 134 public void removeLinksForSource(String sourceId) throws DirectoryException { 135 try (MongoDBSession session = getMongoDBSession()) { 136 removeLinksForSource(sourceId, session); 137 } 138 } 139 140 /** 141 * Removes all the links for a given source id 142 * 143 * @param sourceId the source id 144 * @param session the mongoDB session 145 */ 146 public void removeLinksForSource(String sourceId, MongoDBSession session) { 147 removeLinksFor(sourceField, sourceId, session); 148 } 149 150 @Override 151 public void removeLinksForTarget(String targetId) throws DirectoryException { 152 try (MongoDBSession session = getMongoDBSession()) { 153 removeLinksFor(targetField, targetId, session); 154 } 155 } 156 157 private void removeLinksFor(String field, String value, MongoDBSession session) { 158 try { 159 DeleteResult result = session.getCollection(collection) 160 .deleteMany(MongoDBSerializationHelper.fieldMapToBson(field, value)); 161 if (!result.wasAcknowledged()) { 162 throw new DirectoryException( 163 "Error while deleting the entry, the request has not been acknowledged by the server"); 164 } 165 } catch (MongoWriteException e) { 166 throw new DirectoryException(e); 167 } 168 } 169 170 @Override 171 public List<String> getTargetIdsForSource(String sourceId) throws DirectoryException { 172 try (MongoDBSession session = getMongoDBSession()) { 173 return getIdsFor(sourceField, sourceId, targetField, session); 174 } 175 } 176 177 /** 178 * Retrieves all target ids associated to the given source id 179 * 180 * @param sourceId the source id 181 * @param session the mongoDB session 182 * @return the list of target ids 183 * @throws DirectoryException 184 */ 185 public List<String> getTargetIdsForSource(String sourceId, MongoDBSession session) throws DirectoryException { 186 return getIdsFor(sourceField, sourceId, targetField, session); 187 } 188 189 @Override 190 public List<String> getSourceIdsForTarget(String targetId) throws DirectoryException { 191 try (MongoDBSession session = getMongoDBSession()) { 192 return getIdsFor(targetField, targetId, sourceField, session); 193 } 194 } 195 196 private List<String> getIdsFor(String queryField, String value, String resultField, MongoDBSession session) { 197 FindIterable<Document> docs = session.getCollection(collection) 198 .find(MongoDBSerializationHelper.fieldMapToBson(queryField, value)); 199 List<String> ids = StreamSupport.stream(docs.spliterator(), false) 200 .map(doc -> doc.getString(resultField)) 201 .collect(Collectors.toList()); 202 return ids; 203 } 204 205 @Override 206 public void setTargetIdsForSource(String sourceId, List<String> targetIds) throws DirectoryException { 207 try (MongoDBSession session = getMongoDBSession()) { 208 setTargetIdsForSource(sourceId, targetIds, session); 209 } 210 } 211 212 /** 213 * Sets all target ids to be associated to the given source id 214 * 215 * @param sourceId the source id 216 * @param targetIds the target ids 217 * @param session the mongoDB session 218 * @throws DirectoryException 219 */ 220 public void setTargetIdsForSource(String sourceId, List<String> targetIds, MongoDBSession session) 221 throws DirectoryException { 222 setIdsFor(sourceField, sourceId, targetField, targetIds, session); 223 } 224 225 @Override 226 public void setSourceIdsForTarget(String targetId, List<String> sourceIds) throws DirectoryException { 227 try (MongoDBSession session = getMongoDBSession()) { 228 setIdsFor(targetField, targetId, sourceField, sourceIds, session); 229 } 230 } 231 232 private void setIdsFor(String field, String value, String fieldToUpdate, List<String> ids, MongoDBSession session) { 233 Set<String> idsToAdd = new HashSet<>(); 234 if (ids != null) { 235 idsToAdd.addAll(ids); 236 } 237 List<String> idsToDelete = new ArrayList<>(); 238 239 List<String> existingIds = getIdsFor(field, value, fieldToUpdate, session); 240 for (String id : existingIds) { 241 if (!idsToAdd.remove(id)) { 242 idsToDelete.add(id); 243 } 244 } 245 246 if (!idsToDelete.isEmpty()) { 247 BasicDBList list = new BasicDBList(); 248 if (sourceField.equals(field)) { 249 list.addAll(idsToDelete.stream().map(id -> buildDoc(value, id)).collect(Collectors.toList())); 250 } else { 251 list.addAll(idsToDelete.stream().map(id -> buildDoc(id, value)).collect(Collectors.toList())); 252 } 253 Bson deleteDoc = new BasicDBObject("$or", list); 254 session.getCollection(collection).deleteMany(deleteDoc); 255 } 256 257 if (!idsToAdd.isEmpty()) { 258 List<Document> list; 259 if (sourceField.equals(field)) { 260 list = idsToAdd.stream().map(id -> buildDoc(value, id)).collect(Collectors.toList()); 261 } else { 262 list = idsToAdd.stream().map(id -> buildDoc(id, value)).collect(Collectors.toList()); 263 } 264 session.getCollection(collection).insertMany(list); 265 } 266 } 267 268 private Document buildDoc(String sourceId, String targetId) { 269 Map<String, Object> fieldMap = new HashMap<>(); 270 fieldMap.put(sourceField, sourceId); 271 fieldMap.put(targetField, targetId); 272 return MongoDBSerializationHelper.fieldMapToBson(fieldMap); 273 } 274 275 @Override 276 public MongoDBReference clone() { 277 MongoDBReference clone = (MongoDBReference) super.clone(); 278 return clone; 279 } 280 281 protected MongoDBSession getMongoDBSession() throws DirectoryException { 282 if (!initialized) { 283 if (dataFileName != null) { 284 try (MongoDBSession session = (MongoDBSession) getSourceDirectory().getSession()) { 285 // fake schema for DirectoryCSVLoader.loadData 286 SchemaImpl schema = new SchemaImpl(collection, null); 287 schema.addField(sourceField, StringType.INSTANCE, null, 0, Collections.emptySet()); 288 schema.addField(targetField, StringType.INSTANCE, null, 0, Collections.emptySet()); 289 DirectoryCSVLoader.loadData(dataFileName, BaseDirectoryDescriptor.DEFAULT_DATA_FILE_CHARACTER_SEPARATOR, 290 schema, map -> session.getCollection(collection).insertOne(MongoDBSerializationHelper.fieldMapToBson(map))); 291 } 292 } 293 initialized = true; 294 } 295 return (MongoDBSession) getSourceDirectory().getSession(); 296 } 297 298}