001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Funsho David
018 *
019 */
020
021package org.nuxeo.directory.mongodb;
022
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.function.Consumer;
031import java.util.stream.Collectors;
032import java.util.stream.StreamSupport;
033
034import org.bson.Document;
035import org.bson.conversions.Bson;
036import org.nuxeo.ecm.core.schema.types.SchemaImpl;
037import org.nuxeo.ecm.core.schema.types.primitives.StringType;
038import org.nuxeo.ecm.directory.AbstractReference;
039import org.nuxeo.ecm.directory.BaseDirectoryDescriptor;
040import org.nuxeo.ecm.directory.DirectoryCSVLoader;
041import org.nuxeo.ecm.directory.DirectoryException;
042import org.nuxeo.ecm.directory.Reference;
043
044import com.mongodb.BasicDBList;
045import com.mongodb.BasicDBObject;
046import com.mongodb.MongoWriteException;
047import com.mongodb.client.FindIterable;
048import com.mongodb.client.MongoCollection;
049import com.mongodb.client.result.DeleteResult;
050import org.nuxeo.ecm.directory.ReferenceDescriptor;
051import org.nuxeo.ecm.directory.Session;
052
053/**
054 * MongoDB implementation of a {@link Reference}
055 * 
056 * @since 9.1
057 */
058public class MongoDBReference extends AbstractReference {
059
060    protected String collection;
061
062    protected String sourceField;
063
064    protected String targetField;
065
066    protected String dataFileName;
067
068    private boolean initialized;
069
070    /**
071     * @since 9.2
072     */
073    public MongoDBReference(String field, String directory, String collection, String sourceField, String targetField,
074            String dataFileName) {
075        super(field, directory);
076        this.collection = collection;
077        this.sourceField = sourceField;
078        this.targetField = targetField;
079        this.dataFileName = dataFileName;
080    }
081
082    /**
083     * @since 9.2
084     */
085    public MongoDBReference(MongoDBReferenceDescriptor descriptor) {
086        this(descriptor.getFieldName(), descriptor.getTargetDirectoryName(), descriptor.getCollection(),
087                descriptor.getSourceField(), descriptor.getTargetField(), descriptor.getDataFileName());
088    }
089
090    /**
091     * @since 9.2
092     */
093    public MongoDBReference(ReferenceDescriptor descriptor) {
094        this(descriptor.getFieldName(), descriptor.getDirectory(), descriptor.getReferenceName(),
095                descriptor.getSource(), descriptor.getTarget(), descriptor.getDataFileName());
096    }
097
098    @Override
099    public void addLinks(String sourceId, List<String> targetIds) throws DirectoryException {
100        try (MongoDBSession session = getMongoDBSession()) {
101            addLinks(sourceId, targetIds, session);
102        }
103    }
104
105    @Override
106    public void addLinks(String sourceId, List<String> targetIds, Session session) throws DirectoryException {
107        MongoDBSession mongoSession = (MongoDBSession) session;
108        if (!initialized) {
109            if (dataFileName != null) {
110                initializeSession(mongoSession);
111            }
112            initialized = true;
113        }
114        if (targetIds == null || targetIds.isEmpty()) {
115            return;
116        }
117        try {
118            MongoCollection<Document> coll = mongoSession.getCollection(collection);
119            List<Document> newDocs = targetIds.stream()
120                                              .map(targetId -> buildDoc(sourceId, targetId))
121                                              .filter(doc -> coll.count(doc) == 0)
122                                              .collect(Collectors.toList());
123            coll.insertMany(newDocs);
124        } catch (MongoWriteException e) {
125            throw new DirectoryException(e);
126        }
127    }
128
129    @Override
130    public void addLinks(List<String> sourceIds, String targetId, Session session) throws DirectoryException {
131        MongoDBSession mongodbSession = (MongoDBSession) session;
132        MongoCollection<Document> coll = mongodbSession.getCollection(collection);
133        List<Document> newDocs = sourceIds.stream()
134                                          .map(sourceId -> buildDoc(sourceId, targetId))
135                                          .filter(doc -> coll.count(doc) == 0)
136                                          .collect(Collectors.toList());
137        if (!newDocs.isEmpty()) {
138            coll.insertMany(newDocs);
139        }
140    }
141
142    @Override
143    public void addLinks(List<String> sourceIds, String targetId) throws DirectoryException {
144        if (sourceIds == null || sourceIds.isEmpty()) {
145            return;
146        }
147        try (MongoDBSession session = getMongoDBSession()) {
148            addLinks(sourceIds, targetId, session);
149        } catch (MongoWriteException e) {
150            throw new DirectoryException(e);
151        }
152    }
153
154    @Override
155    public void removeLinksForSource(String sourceId) throws DirectoryException {
156        try (MongoDBSession session = getMongoDBSession()) {
157            removeLinksForSource(sourceId, session);
158        }
159    }
160
161    @Override
162    public void removeLinksForSource(String sourceId, Session session) {
163        removeLinksFor(sourceField, sourceId, (MongoDBSession) session);
164    }
165
166    @Override
167    public void removeLinksForTarget(String targetId) throws DirectoryException {
168        try (MongoDBSession session = getMongoDBSession()) {
169            removeLinksFor(targetField, targetId, session);
170        }
171    }
172
173    @Override
174    public void removeLinksForTarget(String targetId, Session session) throws DirectoryException {
175        removeLinksFor(targetField, targetId, (MongoDBSession) session);
176    }
177
178    private void removeLinksFor(String field, String value, MongoDBSession session) {
179        try {
180            DeleteResult result = session.getCollection(collection)
181                                         .deleteMany(MongoDBSerializationHelper.fieldMapToBson(field, value));
182            if (!result.wasAcknowledged()) {
183                throw new DirectoryException(
184                        "Error while deleting the entry, the request has not been acknowledged by the server");
185            }
186        } catch (MongoWriteException e) {
187            throw new DirectoryException(e);
188        }
189    }
190
191    @Override
192    public List<String> getTargetIdsForSource(String sourceId) throws DirectoryException {
193        try (MongoDBSession session = getMongoDBSession()) {
194            return getIdsFor(sourceField, sourceId, targetField, session);
195        }
196    }
197
198    /**
199     * Retrieves all target ids associated to the given source id
200     * 
201     * @param sourceId the source id
202     * @param session the mongoDB session
203     * @return the list of target ids
204     * @throws DirectoryException
205     */
206    public List<String> getTargetIdsForSource(String sourceId, MongoDBSession session) throws DirectoryException {
207        return getIdsFor(sourceField, sourceId, targetField, session);
208    }
209
210    @Override
211    public List<String> getSourceIdsForTarget(String targetId) throws DirectoryException {
212        try (MongoDBSession session = getMongoDBSession()) {
213            return getIdsFor(targetField, targetId, sourceField, session);
214        }
215    }
216
217    private List<String> getIdsFor(String queryField, String value, String resultField, MongoDBSession session) {
218        FindIterable<Document> docs = session.getCollection(collection)
219                                             .find(MongoDBSerializationHelper.fieldMapToBson(queryField, value));
220        return StreamSupport.stream(docs.spliterator(), false)
221                            .map(doc -> doc.getString(resultField))
222                            .collect(Collectors.toList());
223    }
224
225    @Override
226    public void setTargetIdsForSource(String sourceId, List<String> targetIds) throws DirectoryException {
227        try (MongoDBSession session = getMongoDBSession()) {
228            setTargetIdsForSource(sourceId, targetIds, session);
229        }
230    }
231
232    @Override
233    public void setTargetIdsForSource(String sourceId, List<String> targetIds, Session session)
234            throws DirectoryException {
235        setIdsFor(sourceField, sourceId, targetField, targetIds, (MongoDBSession) session);
236    }
237
238    @Override
239    public void setSourceIdsForTarget(String targetId, List<String> sourceIds) throws DirectoryException {
240        try (MongoDBSession session = getMongoDBSession()) {
241            setIdsFor(targetField, targetId, sourceField, sourceIds, session);
242        }
243    }
244
245    @Override
246    public void setSourceIdsForTarget(String targetId, List<String> sourceIds, Session session)
247            throws DirectoryException {
248        setIdsFor(targetField, targetId, sourceField, sourceIds, (MongoDBSession) session);
249    }
250
251    private void setIdsFor(String field, String value, String fieldToUpdate, List<String> ids, MongoDBSession session) {
252        Set<String> idsToAdd = new HashSet<>();
253        if (ids != null) {
254            idsToAdd.addAll(ids);
255        }
256        List<String> idsToDelete = new ArrayList<>();
257
258        List<String> existingIds = getIdsFor(field, value, fieldToUpdate, session);
259        for (String id : existingIds) {
260            if (!idsToAdd.remove(id)) {
261                idsToDelete.add(id);
262            }
263        }
264
265        if (!idsToDelete.isEmpty()) {
266            BasicDBList list = new BasicDBList();
267            if (sourceField.equals(field)) {
268                list.addAll(idsToDelete.stream().map(id -> buildDoc(value, id)).collect(Collectors.toList()));
269            } else {
270                list.addAll(idsToDelete.stream().map(id -> buildDoc(id, value)).collect(Collectors.toList()));
271            }
272            Bson deleteDoc = new BasicDBObject("$or", list);
273            session.getCollection(collection).deleteMany(deleteDoc);
274        }
275
276        if (!idsToAdd.isEmpty()) {
277            List<Document> list;
278            if (sourceField.equals(field)) {
279                list = idsToAdd.stream().map(id -> buildDoc(value, id)).collect(Collectors.toList());
280            } else {
281                list = idsToAdd.stream().map(id -> buildDoc(id, value)).collect(Collectors.toList());
282            }
283            session.getCollection(collection).insertMany(list);
284        }
285    }
286
287    private Document buildDoc(String sourceId, String targetId) {
288        Map<String, Object> fieldMap = new HashMap<>();
289        fieldMap.put(sourceField, sourceId);
290        fieldMap.put(targetField, targetId);
291        return MongoDBSerializationHelper.fieldMapToBson(fieldMap);
292    }
293
294    protected void initializeSession(MongoDBSession session) {
295        // fake schema for DirectoryCSVLoader.loadData
296        SchemaImpl schema = new SchemaImpl(collection, null);
297        schema.addField(sourceField, StringType.INSTANCE, null, 0, Collections.emptySet());
298        schema.addField(targetField, StringType.INSTANCE, null, 0, Collections.emptySet());
299
300        Consumer<Map<String, Object>> loader = map -> {
301            Document doc = MongoDBSerializationHelper.fieldMapToBson(map);
302            MongoCollection<Document> coll = session.getCollection(collection);
303            if (coll.count(doc) == 0) {
304                coll.insertOne(doc);
305            }
306        };
307        DirectoryCSVLoader.loadData(dataFileName, BaseDirectoryDescriptor.DEFAULT_DATA_FILE_CHARACTER_SEPARATOR, schema,
308                loader);
309    }
310
311    protected MongoDBSession getMongoDBSession() throws DirectoryException {
312        if (!initialized) {
313            if (dataFileName != null) {
314                try (MongoDBSession session = (MongoDBSession) getSourceDirectory().getSession()) {
315                    initializeSession(session);
316                }
317            }
318            initialized = true;
319        }
320        return (MongoDBSession) getSourceDirectory().getSession();
321    }
322
323}