001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Funsho David
018 *
019 */
020
021package org.nuxeo.directory.mongodb;
022
023import com.mongodb.BasicDBList;
024import com.mongodb.BasicDBObject;
025import com.mongodb.MongoWriteException;
026import com.mongodb.client.FindIterable;
027import com.mongodb.client.result.DeleteResult;
028import org.bson.Document;
029import org.bson.conversions.Bson;
030import org.nuxeo.common.xmap.annotation.XNode;
031import org.nuxeo.common.xmap.annotation.XObject;
032import org.nuxeo.ecm.core.schema.types.SchemaImpl;
033import org.nuxeo.ecm.core.schema.types.primitives.StringType;
034import org.nuxeo.ecm.directory.AbstractReference;
035import org.nuxeo.ecm.directory.BaseDirectoryDescriptor;
036import org.nuxeo.ecm.directory.DirectoryCSVLoader;
037import org.nuxeo.ecm.directory.DirectoryException;
038import org.nuxeo.ecm.directory.Reference;
039import org.nuxeo.mongodb.core.MongoDBSerializationHelper;
040
041import java.util.ArrayList;
042import java.util.Collections;
043import java.util.HashMap;
044import java.util.HashSet;
045import java.util.List;
046import java.util.Map;
047import java.util.Set;
048import java.util.stream.Collectors;
049import java.util.stream.StreamSupport;
050
051/**
052 * MongoDB implementation of a {@link Reference}
053 * 
054 * @since 9.1
055 */
056@XObject("reference")
057public class MongoDBReference extends AbstractReference implements Cloneable {
058
059    @XNode("@collection")
060    protected String collection;
061
062    @XNode("@sourceField")
063    protected String sourceField;
064
065    @XNode("@targetField")
066    protected String targetField;
067
068    @XNode("@dataFile")
069    protected String dataFileName;
070
071    private boolean initialized;
072
073    @XNode("@field")
074    public void setFieldName(String fieldName) {
075        this.fieldName = fieldName;
076    }
077
078    @Override
079    @XNode("@directory")
080    public void setTargetDirectoryName(String targetDirectoryName) {
081        this.targetDirectoryName = targetDirectoryName;
082    }
083
084    @Override
085    public void addLinks(String sourceId, List<String> targetIds) throws DirectoryException {
086        if (targetIds == null) {
087            return;
088        }
089        try (MongoDBSession session = getMongoDBSession()) {
090            addLinks(sourceId, targetIds, session);
091        }
092    }
093
094    /**
095     * Adds the links between the source id and the target ids
096     * 
097     * @param sourceId the source id
098     * @param targetIds the target ids
099     * @param session the mongoDB session
100     * @throws DirectoryException
101     */
102    public void addLinks(String sourceId, List<String> targetIds, MongoDBSession session) throws DirectoryException {
103        if (targetIds == null) {
104            return;
105        }
106        try {
107            List<Document> newDocs = targetIds.stream()
108                                              .map(targetId -> buildDoc(sourceId, targetId))
109                                              .filter(doc -> session.getCollection(collection).count(doc) == 0)
110                                              .collect(Collectors.toList());
111            session.getCollection(collection).insertMany(newDocs);
112        } catch (MongoWriteException e) {
113            throw new DirectoryException(e);
114        }
115    }
116
117    @Override
118    public void addLinks(List<String> sourceIds, String targetId) throws DirectoryException {
119        if (sourceIds == null) {
120            return;
121        }
122        try (MongoDBSession session = getMongoDBSession()) {
123            List<Document> newDocs = sourceIds.stream()
124                                              .map(sourceId -> buildDoc(sourceId, targetId))
125                                              .filter(doc -> session.getCollection(collection).count(doc) == 0)
126                                              .collect(Collectors.toList());
127            session.getCollection(collection).insertMany(newDocs);
128        } catch (MongoWriteException e) {
129            throw new DirectoryException(e);
130        }
131    }
132
133    @Override
134    public void removeLinksForSource(String sourceId) throws DirectoryException {
135        try (MongoDBSession session = getMongoDBSession()) {
136            removeLinksForSource(sourceId, session);
137        }
138    }
139
140    /**
141     * Removes all the links for a given source id
142     * 
143     * @param sourceId the source id
144     * @param session the mongoDB session
145     */
146    public void removeLinksForSource(String sourceId, MongoDBSession session) {
147        removeLinksFor(sourceField, sourceId, session);
148    }
149
150    @Override
151    public void removeLinksForTarget(String targetId) throws DirectoryException {
152        try (MongoDBSession session = getMongoDBSession()) {
153            removeLinksFor(targetField, targetId, session);
154        }
155    }
156
157    private void removeLinksFor(String field, String value, MongoDBSession session) {
158        try {
159            DeleteResult result = session.getCollection(collection)
160                                         .deleteMany(MongoDBSerializationHelper.fieldMapToBson(field, value));
161            if (!result.wasAcknowledged()) {
162                throw new DirectoryException(
163                        "Error while deleting the entry, the request has not been acknowledged by the server");
164            }
165        } catch (MongoWriteException e) {
166            throw new DirectoryException(e);
167        }
168    }
169
170    @Override
171    public List<String> getTargetIdsForSource(String sourceId) throws DirectoryException {
172        try (MongoDBSession session = getMongoDBSession()) {
173            return getIdsFor(sourceField, sourceId, targetField, session);
174        }
175    }
176
177    /**
178     * Retrieves all target ids associated to the given source id
179     * 
180     * @param sourceId the source id
181     * @param session the mongoDB session
182     * @return the list of target ids
183     * @throws DirectoryException
184     */
185    public List<String> getTargetIdsForSource(String sourceId, MongoDBSession session) throws DirectoryException {
186        return getIdsFor(sourceField, sourceId, targetField, session);
187    }
188
189    @Override
190    public List<String> getSourceIdsForTarget(String targetId) throws DirectoryException {
191        try (MongoDBSession session = getMongoDBSession()) {
192            return getIdsFor(targetField, targetId, sourceField, session);
193        }
194    }
195
196    private List<String> getIdsFor(String queryField, String value, String resultField, MongoDBSession session) {
197        FindIterable<Document> docs = session.getCollection(collection)
198                                             .find(MongoDBSerializationHelper.fieldMapToBson(queryField, value));
199        List<String> ids = StreamSupport.stream(docs.spliterator(), false)
200                                        .map(doc -> doc.getString(resultField))
201                                        .collect(Collectors.toList());
202        return ids;
203    }
204
205    @Override
206    public void setTargetIdsForSource(String sourceId, List<String> targetIds) throws DirectoryException {
207        try (MongoDBSession session = getMongoDBSession()) {
208            setTargetIdsForSource(sourceId, targetIds, session);
209        }
210    }
211
212    /**
213     * Sets all target ids to be associated to the given source id
214     * 
215     * @param sourceId the source id
216     * @param targetIds the target ids
217     * @param session the mongoDB session
218     * @throws DirectoryException
219     */
220    public void setTargetIdsForSource(String sourceId, List<String> targetIds, MongoDBSession session)
221            throws DirectoryException {
222        setIdsFor(sourceField, sourceId, targetField, targetIds, session);
223    }
224
225    @Override
226    public void setSourceIdsForTarget(String targetId, List<String> sourceIds) throws DirectoryException {
227        try (MongoDBSession session = getMongoDBSession()) {
228            setIdsFor(targetField, targetId, sourceField, sourceIds, session);
229        }
230    }
231
232    private void setIdsFor(String field, String value, String fieldToUpdate, List<String> ids, MongoDBSession session) {
233        Set<String> idsToAdd = new HashSet<>();
234        if (ids != null) {
235            idsToAdd.addAll(ids);
236        }
237        List<String> idsToDelete = new ArrayList<>();
238
239        List<String> existingIds = getIdsFor(field, value, fieldToUpdate, session);
240        for (String id : existingIds) {
241            if (!idsToAdd.remove(id)) {
242                idsToDelete.add(id);
243            }
244        }
245
246        if (!idsToDelete.isEmpty()) {
247            BasicDBList list = new BasicDBList();
248            if (sourceField.equals(field)) {
249                list.addAll(idsToDelete.stream().map(id -> buildDoc(value, id)).collect(Collectors.toList()));
250            } else {
251                list.addAll(idsToDelete.stream().map(id -> buildDoc(id, value)).collect(Collectors.toList()));
252            }
253            Bson deleteDoc = new BasicDBObject("$or", list);
254            session.getCollection(collection).deleteMany(deleteDoc);
255        }
256
257        if (!idsToAdd.isEmpty()) {
258            List<Document> list;
259            if (sourceField.equals(field)) {
260                list = idsToAdd.stream().map(id -> buildDoc(value, id)).collect(Collectors.toList());
261            } else {
262                list = idsToAdd.stream().map(id -> buildDoc(id, value)).collect(Collectors.toList());
263            }
264            session.getCollection(collection).insertMany(list);
265        }
266    }
267
268    private Document buildDoc(String sourceId, String targetId) {
269        Map<String, Object> fieldMap = new HashMap<>();
270        fieldMap.put(sourceField, sourceId);
271        fieldMap.put(targetField, targetId);
272        return MongoDBSerializationHelper.fieldMapToBson(fieldMap);
273    }
274
275    @Override
276    public MongoDBReference clone() {
277        MongoDBReference clone = (MongoDBReference) super.clone();
278        return clone;
279    }
280
281    protected MongoDBSession getMongoDBSession() throws DirectoryException {
282        if (!initialized) {
283            if (dataFileName != null) {
284                try (MongoDBSession session = (MongoDBSession) getSourceDirectory().getSession()) {
285                    // fake schema for DirectoryCSVLoader.loadData
286                    SchemaImpl schema = new SchemaImpl(collection, null);
287                    schema.addField(sourceField, StringType.INSTANCE, null, 0, Collections.emptySet());
288                    schema.addField(targetField, StringType.INSTANCE, null, 0, Collections.emptySet());
289                    DirectoryCSVLoader.loadData(dataFileName, BaseDirectoryDescriptor.DEFAULT_DATA_FILE_CHARACTER_SEPARATOR,
290                            schema, map -> session.getCollection(collection).insertOne(MongoDBSerializationHelper.fieldMapToBson(map)));
291                }
292            }
293            initialized = true;
294        }
295        return (MongoDBSession) getSourceDirectory().getSession();
296    }
297
298}