001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Funsho David
018 *
019 */
020
021package org.nuxeo.directory.mongodb;
022
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.function.Consumer;
031import java.util.stream.Collectors;
032import java.util.stream.StreamSupport;
033
034import org.bson.Document;
035import org.bson.conversions.Bson;
036import org.nuxeo.ecm.core.schema.types.SchemaImpl;
037import org.nuxeo.ecm.core.schema.types.primitives.StringType;
038import org.nuxeo.ecm.directory.AbstractReference;
039import org.nuxeo.ecm.directory.BaseDirectoryDescriptor;
040import org.nuxeo.ecm.directory.DirectoryCSVLoader;
041import org.nuxeo.ecm.directory.DirectoryException;
042import org.nuxeo.ecm.directory.Reference;
043
044import com.mongodb.BasicDBList;
045import com.mongodb.BasicDBObject;
046import com.mongodb.MongoWriteException;
047import com.mongodb.client.FindIterable;
048import com.mongodb.client.MongoCollection;
049import com.mongodb.client.result.DeleteResult;
050import org.nuxeo.ecm.directory.ReferenceDescriptor;
051import org.nuxeo.ecm.directory.Session;
052
053/**
054 * MongoDB implementation of a {@link Reference}
055 *
056 * @since 9.1
057 */
058public class MongoDBReference extends AbstractReference {
059
060    protected String collection;
061
062    protected String sourceField;
063
064    protected String targetField;
065
066    protected String dataFileName;
067
068    /**
069     * @since 9.2
070     */
071    public MongoDBReference(String field, String directory, String collection, String sourceField, String targetField,
072            String dataFileName) {
073        super(field, directory);
074        this.collection = collection;
075        this.sourceField = sourceField;
076        this.targetField = targetField;
077        this.dataFileName = dataFileName;
078    }
079
080    /**
081     * @since 9.2
082     */
083    public MongoDBReference(MongoDBReferenceDescriptor descriptor) {
084        this(descriptor.getFieldName(), descriptor.getTargetDirectoryName(), descriptor.getCollection(),
085                descriptor.getSourceField(), descriptor.getTargetField(), descriptor.getDataFileName());
086    }
087
088    /**
089     * @since 9.2
090     */
091    public MongoDBReference(ReferenceDescriptor descriptor) {
092        this(descriptor.getFieldName(), descriptor.getDirectory(), descriptor.getReferenceName(),
093                descriptor.getSource(), descriptor.getTarget(), descriptor.getDataFileName());
094    }
095
096    @Override
097    public void addLinks(String sourceId, List<String> targetIds) {
098        try (MongoDBSession session = getMongoDBSession()) {
099            addLinks(sourceId, targetIds, session);
100        }
101    }
102
103    @Override
104    public void addLinks(String sourceId, List<String> targetIds, Session session) {
105        if (targetIds == null || targetIds.isEmpty()) {
106            return;
107        }
108        try {
109            MongoDBSession mongoSession = (MongoDBSession) session;
110            MongoCollection<Document> coll = getCollection(mongoSession);
111            List<Document> newDocs = targetIds.stream()
112                                              .map(targetId -> buildDoc(sourceId, targetId))
113                                              .filter(doc -> coll.count(doc) == 0)
114                                              .collect(Collectors.toList());
115            if (!newDocs.isEmpty()) {
116                coll.insertMany(newDocs);
117            }
118        } catch (MongoWriteException e) {
119            throw new DirectoryException(e);
120        }
121    }
122
123    @Override
124    public void addLinks(List<String> sourceIds, String targetId, Session session) {
125        MongoDBSession mongodbSession = (MongoDBSession) session;
126        MongoCollection<Document> coll = getCollection(mongodbSession);
127        List<Document> newDocs = sourceIds.stream()
128                                          .map(sourceId -> buildDoc(sourceId, targetId))
129                                          .filter(doc -> coll.count(doc) == 0)
130                                          .collect(Collectors.toList());
131        if (!newDocs.isEmpty()) {
132            coll.insertMany(newDocs);
133        }
134    }
135
136    @Override
137    public void addLinks(List<String> sourceIds, String targetId) {
138        if (sourceIds == null || sourceIds.isEmpty()) {
139            return;
140        }
141        try (MongoDBSession session = getMongoDBSession()) {
142            addLinks(sourceIds, targetId, session);
143        } catch (MongoWriteException e) {
144            throw new DirectoryException(e);
145        }
146    }
147
148    @Override
149    public void removeLinksForSource(String sourceId) {
150        try (MongoDBSession session = getMongoDBSession()) {
151            removeLinksForSource(sourceId, session);
152        }
153    }
154
155    @Override
156    public void removeLinksForSource(String sourceId, Session session) {
157        removeLinksFor(sourceField, sourceId, (MongoDBSession) session);
158    }
159
160    @Override
161    public void removeLinksForTarget(String targetId) {
162        try (MongoDBSession session = getMongoDBSession()) {
163            removeLinksFor(targetField, targetId, session);
164        }
165    }
166
167    @Override
168    public void removeLinksForTarget(String targetId, Session session) {
169        removeLinksFor(targetField, targetId, (MongoDBSession) session);
170    }
171
172    private void removeLinksFor(String field, String value, MongoDBSession session) {
173        try {
174            DeleteResult result = getCollection(session)
175                                         .deleteMany(MongoDBSerializationHelper.fieldMapToBson(field, value));
176            if (!result.wasAcknowledged()) {
177                throw new DirectoryException(
178                        "Error while deleting the entry, the request has not been acknowledged by the server");
179            }
180        } catch (MongoWriteException e) {
181            throw new DirectoryException(e);
182        }
183    }
184
185    @Override
186    public List<String> getTargetIdsForSource(String sourceId) {
187        try (MongoDBSession session = getMongoDBSession()) {
188            return getIdsFor(sourceField, sourceId, targetField, session);
189        }
190    }
191
192    /**
193     * Retrieves all target ids associated to the given source id
194     *
195     * @param sourceId the source id
196     * @param session the mongoDB session
197     * @return the list of target ids
198     */
199    public List<String> getTargetIdsForSource(String sourceId, MongoDBSession session) {
200        return getIdsFor(sourceField, sourceId, targetField, session);
201    }
202
203    @Override
204    public List<String> getSourceIdsForTarget(String targetId) {
205        try (MongoDBSession session = getMongoDBSession()) {
206            return getIdsFor(targetField, targetId, sourceField, session);
207        }
208    }
209
210    private List<String> getIdsFor(String queryField, String value, String resultField, MongoDBSession session) {
211        FindIterable<Document> docs = getCollection(session)
212                                             .find(MongoDBSerializationHelper.fieldMapToBson(queryField, value));
213        return StreamSupport.stream(docs.spliterator(), false)
214                            .map(doc -> doc.getString(resultField))
215                            .collect(Collectors.toList());
216    }
217
218    @Override
219    public void setTargetIdsForSource(String sourceId, List<String> targetIds) {
220        try (MongoDBSession session = getMongoDBSession()) {
221            setTargetIdsForSource(sourceId, targetIds, session);
222        }
223    }
224
225    @Override
226    public void setTargetIdsForSource(String sourceId, List<String> targetIds, Session session) {
227        setIdsFor(sourceField, sourceId, targetField, targetIds, (MongoDBSession) session);
228    }
229
230    @Override
231    public void setSourceIdsForTarget(String targetId, List<String> sourceIds) {
232        try (MongoDBSession session = getMongoDBSession()) {
233            setIdsFor(targetField, targetId, sourceField, sourceIds, session);
234        }
235    }
236
237    @Override
238    public void setSourceIdsForTarget(String targetId, List<String> sourceIds, Session session) {
239        setIdsFor(targetField, targetId, sourceField, sourceIds, (MongoDBSession) session);
240    }
241
242    private void setIdsFor(String field, String value, String fieldToUpdate, List<String> ids, MongoDBSession session) {
243        Set<String> idsToAdd = new HashSet<>();
244        if (ids != null) {
245            idsToAdd.addAll(ids);
246        }
247        List<String> idsToDelete = new ArrayList<>();
248
249        List<String> existingIds = getIdsFor(field, value, fieldToUpdate, session);
250        for (String id : existingIds) {
251            if (!idsToAdd.remove(id)) {
252                idsToDelete.add(id);
253            }
254        }
255
256        if (!idsToDelete.isEmpty()) {
257            BasicDBList list = new BasicDBList();
258            if (sourceField.equals(field)) {
259                list.addAll(idsToDelete.stream().map(id -> buildDoc(value, id)).collect(Collectors.toList()));
260            } else {
261                list.addAll(idsToDelete.stream().map(id -> buildDoc(id, value)).collect(Collectors.toList()));
262            }
263            Bson deleteDoc = new BasicDBObject("$or", list);
264            getCollection(session).deleteMany(deleteDoc);
265        }
266
267        if (!idsToAdd.isEmpty()) {
268            List<Document> list;
269            if (sourceField.equals(field)) {
270                list = idsToAdd.stream().map(id -> buildDoc(value, id)).collect(Collectors.toList());
271            } else {
272                list = idsToAdd.stream().map(id -> buildDoc(id, value)).collect(Collectors.toList());
273            }
274            getCollection(session).insertMany(list);
275        }
276    }
277
278    private Document buildDoc(String sourceId, String targetId) {
279        Map<String, Object> fieldMap = new HashMap<>();
280        fieldMap.put(sourceField, sourceId);
281        fieldMap.put(targetField, targetId);
282        return MongoDBSerializationHelper.fieldMapToBson(fieldMap);
283    }
284
285    protected void initialize(MongoDBSession session) {
286        if (dataFileName != null) {
287            // fake schema for DirectoryCSVLoader.loadData
288            SchemaImpl schema = new SchemaImpl(collection, null);
289            schema.addField(sourceField, StringType.INSTANCE, null, 0, Collections.emptySet());
290            schema.addField(targetField, StringType.INSTANCE, null, 0, Collections.emptySet());
291            Consumer<Map<String, Object>> loader = map -> {
292                Document doc = MongoDBSerializationHelper.fieldMapToBson(map);
293                MongoCollection<Document> coll = getCollection(session);
294                if (coll.countDocuments(doc) == 0) {
295                    coll.insertOne(doc);
296                }
297            };
298            DirectoryCSVLoader.loadData(dataFileName, BaseDirectoryDescriptor.DEFAULT_DATA_FILE_CHARACTER_SEPARATOR,
299                    schema, loader);
300        }
301    }
302
303    protected MongoDBSession getMongoDBSession() {
304        return (MongoDBSession) getSourceDirectory().getSession();
305    }
306
307    protected MongoCollection<Document> getCollection(MongoDBSession session) {
308        return session.getDirectory().database.getCollection(collection);
309    }
310
311}