001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Funsho David
018 *
019 */
020
021package org.nuxeo.directory.mongodb;
022
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.function.Consumer;
031import java.util.stream.Collectors;
032import java.util.stream.StreamSupport;
033
034import org.bson.Document;
035import org.bson.conversions.Bson;
036import org.nuxeo.ecm.core.schema.types.SchemaImpl;
037import org.nuxeo.ecm.core.schema.types.primitives.StringType;
038import org.nuxeo.ecm.directory.AbstractReference;
039import org.nuxeo.ecm.directory.BaseDirectoryDescriptor;
040import org.nuxeo.ecm.directory.DirectoryCSVLoader;
041import org.nuxeo.ecm.directory.DirectoryException;
042import org.nuxeo.ecm.directory.Reference;
043
044import com.mongodb.BasicDBList;
045import com.mongodb.BasicDBObject;
046import com.mongodb.MongoWriteException;
047import com.mongodb.client.FindIterable;
048import com.mongodb.client.MongoCollection;
049import com.mongodb.client.result.DeleteResult;
050import org.nuxeo.ecm.directory.ReferenceDescriptor;
051import org.nuxeo.ecm.directory.Session;
052
053/**
054 * MongoDB implementation of a {@link Reference}
055 * 
056 * @since 9.1
057 */
058public class MongoDBReference extends AbstractReference {
059
060    protected String collection;
061
062    protected String sourceField;
063
064    protected String targetField;
065
066    protected String dataFileName;
067
068    private boolean initialized;
069
070    /**
071     * @since 9.2
072     */
073    public MongoDBReference(String field, String directory, String collection, String sourceField, String targetField,
074            String dataFileName) {
075        super(field, directory);
076        this.collection = collection;
077        this.sourceField = sourceField;
078        this.targetField = targetField;
079        this.dataFileName = dataFileName;
080    }
081
082    /**
083     * @since 9.2
084     */
085    public MongoDBReference(MongoDBReferenceDescriptor descriptor) {
086        this(descriptor.getFieldName(), descriptor.getTargetDirectoryName(), descriptor.getCollection(),
087                descriptor.getSourceField(), descriptor.getTargetField(), descriptor.getDataFileName());
088    }
089
090    /**
091     * @since 9.2
092     */
093    public MongoDBReference(ReferenceDescriptor descriptor) {
094        this(descriptor.getFieldName(), descriptor.getDirectory(), descriptor.getReferenceName(),
095                descriptor.getSource(), descriptor.getTarget(), descriptor.getDataFileName());
096    }
097
098    @Override
099    public void addLinks(String sourceId, List<String> targetIds) throws DirectoryException {
100        try (MongoDBSession session = getMongoDBSession()) {
101            addLinks(sourceId, targetIds, session);
102        }
103    }
104
105    @Override
106    public void addLinks(String sourceId, List<String> targetIds, Session session) throws DirectoryException {
107        MongoDBSession mongoSession = (MongoDBSession) session;
108        if (!initialized) {
109            if (dataFileName != null) {
110                initializeSession(mongoSession);
111            }
112            initialized = true;
113        }
114        if (targetIds == null || targetIds.isEmpty()) {
115            return;
116        }
117        try {
118            MongoCollection<Document> coll = mongoSession.getCollection(collection);
119            List<Document> newDocs = targetIds.stream()
120                                              .map(targetId -> buildDoc(sourceId, targetId))
121                                              .filter(doc -> coll.count(doc) == 0)
122                                              .collect(Collectors.toList());
123            if (!newDocs.isEmpty()) {
124                coll.insertMany(newDocs);
125            }
126        } catch (MongoWriteException e) {
127            throw new DirectoryException(e);
128        }
129    }
130
131    @Override
132    public void addLinks(List<String> sourceIds, String targetId, Session session) throws DirectoryException {
133        MongoDBSession mongodbSession = (MongoDBSession) session;
134        MongoCollection<Document> coll = mongodbSession.getCollection(collection);
135        List<Document> newDocs = sourceIds.stream()
136                                          .map(sourceId -> buildDoc(sourceId, targetId))
137                                          .filter(doc -> coll.count(doc) == 0)
138                                          .collect(Collectors.toList());
139        if (!newDocs.isEmpty()) {
140            coll.insertMany(newDocs);
141        }
142    }
143
144    @Override
145    public void addLinks(List<String> sourceIds, String targetId) throws DirectoryException {
146        if (sourceIds == null || sourceIds.isEmpty()) {
147            return;
148        }
149        try (MongoDBSession session = getMongoDBSession()) {
150            addLinks(sourceIds, targetId, session);
151        } catch (MongoWriteException e) {
152            throw new DirectoryException(e);
153        }
154    }
155
156    @Override
157    public void removeLinksForSource(String sourceId) throws DirectoryException {
158        try (MongoDBSession session = getMongoDBSession()) {
159            removeLinksForSource(sourceId, session);
160        }
161    }
162
163    @Override
164    public void removeLinksForSource(String sourceId, Session session) {
165        removeLinksFor(sourceField, sourceId, (MongoDBSession) session);
166    }
167
168    @Override
169    public void removeLinksForTarget(String targetId) throws DirectoryException {
170        try (MongoDBSession session = getMongoDBSession()) {
171            removeLinksFor(targetField, targetId, session);
172        }
173    }
174
175    @Override
176    public void removeLinksForTarget(String targetId, Session session) throws DirectoryException {
177        removeLinksFor(targetField, targetId, (MongoDBSession) session);
178    }
179
180    private void removeLinksFor(String field, String value, MongoDBSession session) {
181        try {
182            DeleteResult result = session.getCollection(collection)
183                                         .deleteMany(MongoDBSerializationHelper.fieldMapToBson(field, value));
184            if (!result.wasAcknowledged()) {
185                throw new DirectoryException(
186                        "Error while deleting the entry, the request has not been acknowledged by the server");
187            }
188        } catch (MongoWriteException e) {
189            throw new DirectoryException(e);
190        }
191    }
192
193    @Override
194    public List<String> getTargetIdsForSource(String sourceId) throws DirectoryException {
195        try (MongoDBSession session = getMongoDBSession()) {
196            return getIdsFor(sourceField, sourceId, targetField, session);
197        }
198    }
199
200    /**
201     * Retrieves all target ids associated to the given source id
202     * 
203     * @param sourceId the source id
204     * @param session the mongoDB session
205     * @return the list of target ids
206     * @throws DirectoryException
207     */
208    public List<String> getTargetIdsForSource(String sourceId, MongoDBSession session) throws DirectoryException {
209        return getIdsFor(sourceField, sourceId, targetField, session);
210    }
211
212    @Override
213    public List<String> getSourceIdsForTarget(String targetId) throws DirectoryException {
214        try (MongoDBSession session = getMongoDBSession()) {
215            return getIdsFor(targetField, targetId, sourceField, session);
216        }
217    }
218
219    private List<String> getIdsFor(String queryField, String value, String resultField, MongoDBSession session) {
220        FindIterable<Document> docs = session.getCollection(collection)
221                                             .find(MongoDBSerializationHelper.fieldMapToBson(queryField, value));
222        return StreamSupport.stream(docs.spliterator(), false)
223                            .map(doc -> doc.getString(resultField))
224                            .collect(Collectors.toList());
225    }
226
227    @Override
228    public void setTargetIdsForSource(String sourceId, List<String> targetIds) throws DirectoryException {
229        try (MongoDBSession session = getMongoDBSession()) {
230            setTargetIdsForSource(sourceId, targetIds, session);
231        }
232    }
233
234    @Override
235    public void setTargetIdsForSource(String sourceId, List<String> targetIds, Session session)
236            throws DirectoryException {
237        setIdsFor(sourceField, sourceId, targetField, targetIds, (MongoDBSession) session);
238    }
239
240    @Override
241    public void setSourceIdsForTarget(String targetId, List<String> sourceIds) throws DirectoryException {
242        try (MongoDBSession session = getMongoDBSession()) {
243            setIdsFor(targetField, targetId, sourceField, sourceIds, session);
244        }
245    }
246
247    @Override
248    public void setSourceIdsForTarget(String targetId, List<String> sourceIds, Session session)
249            throws DirectoryException {
250        setIdsFor(targetField, targetId, sourceField, sourceIds, (MongoDBSession) session);
251    }
252
253    private void setIdsFor(String field, String value, String fieldToUpdate, List<String> ids, MongoDBSession session) {
254        Set<String> idsToAdd = new HashSet<>();
255        if (ids != null) {
256            idsToAdd.addAll(ids);
257        }
258        List<String> idsToDelete = new ArrayList<>();
259
260        List<String> existingIds = getIdsFor(field, value, fieldToUpdate, session);
261        for (String id : existingIds) {
262            if (!idsToAdd.remove(id)) {
263                idsToDelete.add(id);
264            }
265        }
266
267        if (!idsToDelete.isEmpty()) {
268            BasicDBList list = new BasicDBList();
269            if (sourceField.equals(field)) {
270                list.addAll(idsToDelete.stream().map(id -> buildDoc(value, id)).collect(Collectors.toList()));
271            } else {
272                list.addAll(idsToDelete.stream().map(id -> buildDoc(id, value)).collect(Collectors.toList()));
273            }
274            Bson deleteDoc = new BasicDBObject("$or", list);
275            session.getCollection(collection).deleteMany(deleteDoc);
276        }
277
278        if (!idsToAdd.isEmpty()) {
279            List<Document> list;
280            if (sourceField.equals(field)) {
281                list = idsToAdd.stream().map(id -> buildDoc(value, id)).collect(Collectors.toList());
282            } else {
283                list = idsToAdd.stream().map(id -> buildDoc(id, value)).collect(Collectors.toList());
284            }
285            session.getCollection(collection).insertMany(list);
286        }
287    }
288
289    private Document buildDoc(String sourceId, String targetId) {
290        Map<String, Object> fieldMap = new HashMap<>();
291        fieldMap.put(sourceField, sourceId);
292        fieldMap.put(targetField, targetId);
293        return MongoDBSerializationHelper.fieldMapToBson(fieldMap);
294    }
295
296    protected void initializeSession(MongoDBSession session) {
297        // fake schema for DirectoryCSVLoader.loadData
298        SchemaImpl schema = new SchemaImpl(collection, null);
299        schema.addField(sourceField, StringType.INSTANCE, null, 0, Collections.emptySet());
300        schema.addField(targetField, StringType.INSTANCE, null, 0, Collections.emptySet());
301
302        Consumer<Map<String, Object>> loader = map -> {
303            Document doc = MongoDBSerializationHelper.fieldMapToBson(map);
304            MongoCollection<Document> coll = session.getCollection(collection);
305            if (coll.count(doc) == 0) {
306                coll.insertOne(doc);
307            }
308        };
309        DirectoryCSVLoader.loadData(dataFileName, BaseDirectoryDescriptor.DEFAULT_DATA_FILE_CHARACTER_SEPARATOR, schema,
310                loader);
311    }
312
313    protected MongoDBSession getMongoDBSession() throws DirectoryException {
314        if (!initialized) {
315            if (dataFileName != null) {
316                try (MongoDBSession session = (MongoDBSession) getSourceDirectory().getSession()) {
317                    initializeSession(session);
318                }
319            }
320            initialized = true;
321        }
322        return (MongoDBSession) getSourceDirectory().getSession();
323    }
324
325}