001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.elasticsearch.fetcher;
020
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.Comparator;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027
028import org.elasticsearch.action.search.SearchResponse;
029import org.elasticsearch.search.SearchHit;
030import org.nuxeo.ecm.core.api.CoreInstance;
031import org.nuxeo.ecm.core.api.CoreSession;
032import org.nuxeo.ecm.core.api.DocumentModel;
033import org.nuxeo.ecm.core.api.impl.DocumentModelListImpl;
034import org.nuxeo.ecm.core.query.sql.NXQL;
035
036/**
037 * @since 6.0
038 */
039public class VcsFetcher extends Fetcher {
040
041    private static final int CHUNK_SIZE = 100;
042
043    public VcsFetcher(CoreSession session, SearchResponse response, Map<String, String> repoNames) {
044        super(session, response, repoNames);
045    }
046
047    @Override
048    public DocumentModelListImpl fetchDocuments() {
049        Map<String, List<String>> repoHits = getHitsPerRepository();
050        List<DocumentModel> docs = new ArrayList<>();
051        String openSessionRepository = getSession().getRepositoryName();
052        boolean closeSession;
053        CoreSession session;
054        for (String repo : repoHits.keySet()) {
055            if (openSessionRepository.equals(repo)) {
056                session = getSession();
057                closeSession = false;
058            } else {
059                session = CoreInstance.openCoreSession(repo);
060                closeSession = true;
061            }
062            try {
063                docs.addAll(fetchFromVcs(repoHits.get(repo), session));
064            } finally {
065                if (closeSession) {
066                    session.close();
067                }
068            }
069        }
070        sortResults(docs);
071        DocumentModelListImpl ret = new DocumentModelListImpl(docs.size());
072        if (!docs.isEmpty()) {
073            ret.addAll(docs);
074        }
075        return ret;
076    }
077
078    private Map<String, List<String>> getHitsPerRepository() {
079        Map<String, List<String>> ret = new HashMap<>();
080        for (SearchHit hit : getResponse().getHits()) {
081            String repoName = getRepoForIndex(hit.getIndex());
082            List<String> docIds = ret.get(repoName);
083            if (docIds == null) {
084                docIds = new ArrayList<>();
085                ret.put(repoName, docIds);
086            }
087            docIds.add(hit.getId());
088        }
089        return ret;
090    }
091
092    private List<DocumentModel> fetchFromVcs(List<String> ids, CoreSession session) {
093        List<DocumentModel> ret = null;
094        int size = ids.size();
095        int start = 0;
096        int end = Math.min(CHUNK_SIZE, size);
097        boolean done = false;
098        while (!done) {
099            List<DocumentModel> docs = fetchFromVcsChunk(ids.subList(start, end), session);
100            if (ret == null) {
101                ret = docs;
102            } else {
103                ret.addAll(docs);
104            }
105            if (end >= ids.size()) {
106                done = true;
107            } else {
108                start = end;
109                end = Math.min(start + CHUNK_SIZE, size);
110            }
111        }
112        return ret;
113    }
114
115    private List<DocumentModel> fetchFromVcsChunk(final List<String> ids, CoreSession session)
116    {
117        StringBuilder sb = new StringBuilder();
118        sb.append("SELECT * FROM Document, Relation WHERE ecm:uuid IN (");
119        for (int i = 0; i < ids.size(); i++) {
120            sb.append(NXQL.escapeString(ids.get(i)));
121            if (i < ids.size() - 1) {
122                sb.append(", ");
123            }
124        }
125        sb.append(")");
126        return session.query(sb.toString());
127    }
128
129    private void sortResults(List<DocumentModel> docs) {
130        final List<String> ids = new ArrayList<>();
131        for (SearchHit hit : getResponse().getHits()) {
132            ids.add(getRepoForIndex(hit.getIndex()) + hit.getId());
133        }
134
135        Collections.sort(docs, new Comparator<DocumentModel>() {
136            @Override
137            public int compare(DocumentModel a, DocumentModel b) {
138                return ids.indexOf(a.getRepositoryName() + a.getId()) - ids.indexOf(b.getRepositoryName() + b.getId());
139            }
140        });
141
142    }
143
144}