001/*
002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.elasticsearch.api;
020
021import java.io.Serializable;
022import java.util.Iterator;
023import java.util.Map;
024import java.util.NoSuchElementException;
025
026import org.elasticsearch.search.SearchHit;
027import org.nuxeo.ecm.core.api.IterableQueryResult;
028import org.nuxeo.elasticsearch.core.EsSearchHitConverter;
029import org.nuxeo.elasticsearch.query.NxQueryBuilder;
030
031/**
032 * Iterable query result of results of an ElasticSearch scroll query and next ones.
033 * <p>
034 * Queries ElasticSearch when there's no more result in current response and there's more in cluster.
035 * <p>
036 * For better performance use {@link NxQueryBuilder#onlyElasticsearchResponse()} for the first scroll requests.
037 *
038 * @since 8.4
039 */
040public class EsIterableQueryResultImpl implements IterableQueryResult, Iterator<Map<String, Serializable>> {
041
042    private final ElasticSearchService searchService;
043
044    private EsScrollResult scrollResult;
045
046    private final EsSearchHitConverter converter;
047
048    private final long size;
049
050    private boolean closed;
051
052    private long pos;
053
054    private int relativePos;
055
056    public EsIterableQueryResultImpl(ElasticSearchService searchService, EsScrollResult scrollResult) {
057        assert !scrollResult.getQueryBuilder().getSelectFieldsAndTypes().isEmpty();
058        this.searchService = searchService;
059        this.scrollResult = scrollResult;
060        this.converter = new EsSearchHitConverter(scrollResult.getQueryBuilder().getSelectFieldsAndTypes());
061        this.size = scrollResult.getElasticsearchResponse().getHits().getTotalHits();
062    }
063
064    @Override
065    public void close() {
066        if (!closed) {
067            searchService.clearScroll(scrollResult);
068            closed = true;
069            pos = -1;
070        }
071    }
072
073    @Override
074    public boolean isLife() {
075        return mustBeClosed();
076    }
077
078    @Override
079    public boolean mustBeClosed() {
080        return true;
081    }
082
083    @Override
084    public long size() {
085        return size;
086    }
087
088    @Override
089    public long pos() {
090        return pos;
091    }
092
093    @Override
094    public void skipTo(long pos) {
095        checkNotClosed();
096        if (pos < this.pos) {
097            throw new IllegalArgumentException("Cannot go back in Iterable.");
098        } else if (pos > size) {
099            pos = size;
100        } else {
101            while (pos > this.pos) {
102                nextHit();
103            }
104        }
105        this.pos = pos;
106    }
107
108    @Override
109    public Iterator<Map<String, Serializable>> iterator() {
110        return this;
111    }
112
113    @Override
114    public boolean hasNext() {
115        checkNotClosed();
116        return pos < size;
117    }
118
119    @Override
120    public Map<String, Serializable> next() {
121        checkNotClosed();
122        if (pos == size) {
123            throw new NoSuchElementException();
124        }
125        SearchHit hit = nextHit();
126        return converter.convert(hit);
127    }
128
129    private SearchHit nextHit() {
130        if (relativePos == scrollResult.getElasticsearchResponse().getHits().getHits().length) {
131            // Retrieve next scroll
132            scrollResult = searchService.scroll(scrollResult);
133            relativePos = 0;
134        }
135        SearchHit hit = scrollResult.getElasticsearchResponse().getHits().getAt(relativePos);
136        relativePos++;
137        pos++;
138        return hit;
139    }
140
141    private void checkNotClosed() {
142        if (closed) {
143            throw new IllegalStateException("Query results iterator closed.");
144        }
145    }
146
147}