001/*
002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.elasticsearch.api;
020
021import java.io.Serializable;
022import java.util.Iterator;
023import java.util.Map;
024import java.util.NoSuchElementException;
025
026import org.elasticsearch.search.SearchHit;
027import org.nuxeo.ecm.core.api.IterableQueryResult;
028import org.nuxeo.elasticsearch.core.EsSearchHitConverter;
029import org.nuxeo.elasticsearch.query.NxQueryBuilder;
030
031/**
032 * Iterable query result of results of an ElasticSearch scroll query and next ones.
033 * <p>
034 * Queries ElasticSearch when there's no more result in current response and there's more in cluster.
035 * <p>
036 * For better performance use {@link NxQueryBuilder#onlyElasticsearchResponse()} for the first scroll requests.
037 *
038 * @since 8.4
039 */
040public class EsIterableQueryResultImpl implements IterableQueryResult, Iterator<Map<String, Serializable>> {
041
042    private final ElasticSearchService searchService;
043
044    private final EsSearchHitConverter converter;
045
046    private final long size;
047
048    private EsScrollResult scrollResult;
049
050    private boolean closed;
051
052    private long pos;
053
054    private int relativePos;
055
056    public EsIterableQueryResultImpl(ElasticSearchService searchService, EsScrollResult scrollResult) {
057        assert !scrollResult.getQueryBuilder().getSelectFieldsAndTypes().isEmpty();
058        this.searchService = searchService;
059        this.scrollResult = scrollResult;
060        this.converter = new EsSearchHitConverter(scrollResult.getQueryBuilder().getSelectFieldsAndTypes());
061        this.size = scrollResult.getElasticsearchResponse().getHits().getTotalHits();
062    }
063
064    @Override
065    public void close() {
066        if (!closed) {
067            searchService.clearScroll(scrollResult);
068            closed = true;
069            pos = -1;
070        }
071    }
072
073    @SuppressWarnings("deprecation")
074    @Override
075    public boolean isLife() {
076        return mustBeClosed();
077    }
078
079    @Override
080    public boolean mustBeClosed() {
081        return true;
082    }
083
084    @Override
085    public long size() {
086        return size;
087    }
088
089    @Override
090    public long pos() {
091        return pos;
092    }
093
094    @Override
095    public void skipTo(long pos) {
096        checkNotClosed();
097        if (pos < this.pos) {
098            throw new IllegalArgumentException("Cannot go back in Iterable.");
099        } else if (pos > size) {
100            pos = size;
101        } else {
102            while (pos > this.pos) {
103                nextHit();
104            }
105        }
106        this.pos = pos;
107    }
108
109    @Override
110    public Iterator<Map<String, Serializable>> iterator() {
111        return this;
112    }
113
114    @Override
115    public boolean hasNext() {
116        checkNotClosed();
117        return pos < size;
118    }
119
120    @Override
121    public Map<String, Serializable> next() {
122        checkNotClosed();
123        if (pos == size) {
124            throw new NoSuchElementException();
125        }
126        SearchHit hit = nextHit();
127        return converter.convert(hit);
128    }
129
130    private SearchHit nextHit() {
131        if (relativePos == scrollResult.getElasticsearchResponse().getHits().getHits().length) {
132            // Retrieve next scroll
133            scrollResult = searchService.scroll(scrollResult);
134            relativePos = 0;
135        }
136        SearchHit hit = scrollResult.getElasticsearchResponse().getHits().getAt(relativePos);
137        relativePos++;
138        pos++;
139        return hit;
140    }
141
142    private void checkNotClosed() {
143        if (closed) {
144            throw new IllegalStateException("Query results iterator closed.");
145        }
146    }
147
148}