001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     bdelbosc
016 */
017package org.nuxeo.elasticsearch.fetcher;
018
019import java.util.ArrayList;
020import java.util.Collections;
021import java.util.Comparator;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025
026import org.elasticsearch.action.search.SearchResponse;
027import org.elasticsearch.search.SearchHit;
028import org.nuxeo.ecm.core.api.CoreInstance;
029import org.nuxeo.ecm.core.api.CoreSession;
030import org.nuxeo.ecm.core.api.DocumentModel;
031import org.nuxeo.ecm.core.api.impl.DocumentModelListImpl;
032import org.nuxeo.ecm.core.query.sql.NXQL;
033
034/**
035 * @since 6.0
036 */
037public class VcsFetcher extends Fetcher {
038
039    private static final int CHUNK_SIZE = 100;
040
041    public VcsFetcher(CoreSession session, SearchResponse response, Map<String, String> repoNames) {
042        super(session, response, repoNames);
043    }
044
045    @Override
046    public DocumentModelListImpl fetchDocuments() {
047        Map<String, List<String>> repoHits = getHitsPerRepository();
048        List<DocumentModel> docs = new ArrayList<>();
049        String openSessionRepository = getSession().getRepositoryName();
050        boolean closeSession;
051        CoreSession session;
052        for (String repo : repoHits.keySet()) {
053            if (openSessionRepository.equals(repo)) {
054                session = getSession();
055                closeSession = false;
056            } else {
057                session = CoreInstance.openCoreSession(repo);
058                closeSession = true;
059            }
060            try {
061                docs.addAll(fetchFromVcs(repoHits.get(repo), session));
062            } finally {
063                if (closeSession) {
064                    session.close();
065                }
066            }
067        }
068        sortResults(docs);
069        DocumentModelListImpl ret = new DocumentModelListImpl(docs.size());
070        if (!docs.isEmpty()) {
071            ret.addAll(docs);
072        }
073        return ret;
074    }
075
076    private Map<String, List<String>> getHitsPerRepository() {
077        Map<String, List<String>> ret = new HashMap<>();
078        for (SearchHit hit : getResponse().getHits()) {
079            String repoName = getRepoForIndex(hit.getIndex());
080            List<String> docIds = ret.get(repoName);
081            if (docIds == null) {
082                docIds = new ArrayList<>();
083                ret.put(repoName, docIds);
084            }
085            docIds.add(hit.getId());
086        }
087        return ret;
088    }
089
090    private List<DocumentModel> fetchFromVcs(List<String> ids, CoreSession session) {
091        List<DocumentModel> ret = null;
092        int size = ids.size();
093        int start = 0;
094        int end = Math.min(CHUNK_SIZE, size);
095        boolean done = false;
096        while (!done) {
097            List<DocumentModel> docs = fetchFromVcsChunk(ids.subList(start, end), session);
098            if (ret == null) {
099                ret = docs;
100            } else {
101                ret.addAll(docs);
102            }
103            if (end >= ids.size()) {
104                done = true;
105            } else {
106                start = end;
107                end = Math.min(start + CHUNK_SIZE, size);
108            }
109        }
110        return ret;
111    }
112
113    private List<DocumentModel> fetchFromVcsChunk(final List<String> ids, CoreSession session)
114    {
115        StringBuilder sb = new StringBuilder();
116        sb.append("SELECT * FROM Document, Relation WHERE ecm:uuid IN (");
117        for (int i = 0; i < ids.size(); i++) {
118            sb.append(NXQL.escapeString(ids.get(i)));
119            if (i < ids.size() - 1) {
120                sb.append(", ");
121            }
122        }
123        sb.append(")");
124        return session.query(sb.toString());
125    }
126
127    private void sortResults(List<DocumentModel> docs) {
128        final List<String> ids = new ArrayList<>();
129        for (SearchHit hit : getResponse().getHits()) {
130            ids.add(getRepoForIndex(hit.getIndex()) + hit.getId());
131        }
132
133        Collections.sort(docs, new Comparator<DocumentModel>() {
134            @Override
135            public int compare(DocumentModel a, DocumentModel b) {
136                return ids.indexOf(a.getRepositoryName() + a.getId()) - ids.indexOf(b.getRepositoryName() + b.getId());
137            }
138        });
139
140    }
141
142}