001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Nelson Silva <nsilva@nuxeo.com>
018 */
019
020package org.nuxeo.ecm.automation.client.adapters;
021
022import org.apache.commons.collections.CollectionUtils;
023import org.apache.http.Header;
024import org.apache.http.HttpHeaders;
025import org.apache.http.HttpStatus;
026import org.apache.http.client.protocol.HttpClientContext;
027import org.apache.http.protocol.HttpContext;
028import org.nuxeo.ecm.automation.client.AutomationClient;
029import org.nuxeo.ecm.automation.client.LoginInfo;
030import org.nuxeo.ecm.automation.client.OperationRequest;
031import org.nuxeo.ecm.automation.client.RemoteException;
032import org.nuxeo.ecm.automation.client.Session;
033import org.nuxeo.ecm.automation.client.jaxrs.spi.Connector;
034import org.nuxeo.ecm.automation.client.jaxrs.spi.DefaultOperationRequest;
035import org.nuxeo.ecm.automation.client.jaxrs.spi.DefaultSession;
036import org.nuxeo.ecm.automation.client.jaxrs.spi.JsonMarshalling;
037import org.nuxeo.ecm.automation.client.jaxrs.spi.Request;
038import org.nuxeo.ecm.automation.client.jaxrs.util.MultipartInput;
039import org.nuxeo.ecm.automation.client.model.Blob;
040import org.nuxeo.ecm.automation.client.model.Blobs;
041import org.nuxeo.ecm.automation.client.model.OperationDocumentation;
042import org.nuxeo.ecm.automation.client.model.OperationInput;
043
044import java.io.IOException;
045import java.io.InputStream;
046import java.time.Duration;
047import java.util.HashMap;
048import java.util.List;
049import java.util.Map;
050import java.util.concurrent.CompletableFuture;
051import java.util.concurrent.ExecutionException;
052import java.util.concurrent.ExecutorService;
053import java.util.concurrent.Executors;
054import java.util.concurrent.Future;
055
056import static org.nuxeo.ecm.automation.client.Constants.CTYPE_REQUEST_NOCHARSET;
057import static org.nuxeo.ecm.automation.client.Constants.HEADER_NX_SCHEMAS;
058import static org.nuxeo.ecm.automation.client.Constants.REQUEST_ACCEPT_HEADER;
059
060/**
061 * Asynchronous session adapter.
062 * @since 10.3
063 */
064public class AsyncSession implements Session {
065
066    protected static ExecutorService executor = Executors.newSingleThreadExecutor();
067
068    /**
069     * Request providing a completable call method for convenience.
070     */
071    public class CompletableRequest extends Request {
072
073        protected CompletableFuture<CompletableRequest> future;
074
075        protected int status;
076
077        protected Header[] headers;
078
079        protected Object result;
080
081        protected boolean redirected;
082
083        public CompletableRequest(int method, String url) {
084            super(method, url, (String) null);
085        }
086
087        public CompletableRequest(int method, String url, String entity) {
088            super(method, url, entity);
089        }
090
091        public CompletableRequest(int method, String url, MultipartInput input) {
092            super(method, url, input);
093        }
094
095        @Override
096        public Object handleResult(int status, Header[] headers, InputStream stream, HttpContext ctx)
097                throws RemoteException, IOException {
098            this.status = status;
099            this.headers = headers;
100            List redirects = (List) ctx.getAttribute(HttpClientContext.REDIRECT_LOCATIONS);
101            this.redirected = CollectionUtils.isNotEmpty(redirects);
102            try {
103                this.result = super.handleResult(status, headers, stream, ctx);
104                future.complete(this);
105            } catch (RemoteException e) {
106                future.completeExceptionally(e);
107            }
108            return result;
109        }
110
111        protected AsyncSession getSession() {
112            return AsyncSession.this;
113        }
114
115        protected String getHeader(String name) {
116            return Request.getHeaderValue(headers, name);
117        }
118
119        public CompletableFuture<? extends CompletableRequest> call() {
120            future = new CompletableFuture<>();
121            try {
122                getSession().getConnector().execute(this);
123            } catch (IOException e) {
124                future.completeExceptionally(e);
125            }
126            return future;
127        }
128
129        public int getStatus() {
130            return status;
131        }
132
133        public Object getResult() {
134            return result;
135        }
136
137        public boolean isRedirected() {
138            return redirected;
139        }
140    }
141
142    /**
143     * Asynchronous pooling based request
144     */
145    public class AsyncRequest extends CompletableRequest {
146
147        protected static final String ASYNC_ADAPTER = "/@async";
148
149        public AsyncRequest(int method, String url, String entity) {
150            super(method, url + ASYNC_ADAPTER, entity);
151        }
152
153        public AsyncRequest(int method, String url, MultipartInput input) {
154            super(method, url + ASYNC_ADAPTER, input);
155        }
156
157        protected AsyncSession getSession() {
158            return AsyncSession.this;
159        }
160
161        public CompletableFuture<Object> execute() {
162            return call().thenCompose((req) -> {
163                if (req.getStatus() == HttpStatus.SC_ACCEPTED) {
164                    String location = req.getHeader(HttpHeaders.LOCATION);
165                    return poll(location, Duration.ofSeconds(1), Duration.ofSeconds(30));
166                }
167                return CompletableFuture.completedFuture(req.getResult());
168            });
169        }
170
171        protected CompletableFuture<Object> poll(String location, Duration delay, Duration duration) {
172            CompletableFuture<Object> resultFuture = new CompletableFuture<>();
173            long deadline = System.nanoTime() + duration.toNanos();
174            CompletableRequest req = new CompletableRequest(Request.GET, location);
175            Future pollFuture = executor.submit(() -> {
176                do {
177                    req.call().thenAccept(res -> {
178                        if (req.isRedirected()) {
179                            resultFuture.complete(res.getResult());
180                        }
181                    }).exceptionally(ex -> {
182                        resultFuture.completeExceptionally(ex.getCause());
183                        return null;
184                    });
185                    try {
186                        Thread.sleep(delay.toMillis());
187                    } catch (InterruptedException e) {
188                        // interrupted when result is complete
189                        return;
190                    }
191                } while (deadline > System.nanoTime());
192            });
193            resultFuture.whenComplete((result, thrown) -> {
194                pollFuture.cancel(true);
195            });
196            return resultFuture;
197        }
198    }
199
200    protected final DefaultSession session;
201
202    public AsyncSession(DefaultSession session) {
203        this.session = session;
204    }
205
206    public Session getSession() {
207        return session;
208    }
209
210    @Override
211    public AutomationClient getClient() {
212        return session.getClient();
213    }
214
215    @Override
216    public LoginInfo getLogin() {
217        return session.getLogin();
218    }
219
220    @Override
221    public OperationRequest newRequest(String id) {
222        return newRequest(id, new HashMap<>());
223    }
224
225    @Override
226    public OperationRequest newRequest(String id, Map<String, Object> ctx) {
227        OperationDocumentation op = getOperation(id);
228        if (op == null) {
229            throw new IllegalArgumentException("No such operation: " + id);
230        }
231        return new DefaultOperationRequest(this, op, ctx);
232    }
233
234    @Override
235    public Object execute(OperationRequest request) throws IOException {
236        AsyncRequest req;
237        String content = JsonMarshalling.writeRequest(request);
238        String ctype;
239        Object input = request.getInput();
240        if (input instanceof OperationInput && ((OperationInput) input).isBinary()) {
241            MultipartInput mpinput = Request.buildMultipartInput(input, content);
242            req = new AsyncRequest(Request.POST, request.getUrl(), mpinput);
243            ctype = mpinput.getContentType();
244        } else {
245            req = new AsyncRequest(Request.POST, request.getUrl(), content);
246            ctype = CTYPE_REQUEST_NOCHARSET;
247        }
248        // set headers
249        for (Map.Entry<String, String> entry : request.getHeaders().entrySet()) {
250            req.put(entry.getKey(), entry.getValue());
251        }
252        req.put(HttpHeaders.ACCEPT, REQUEST_ACCEPT_HEADER);
253        req.put(HttpHeaders.CONTENT_TYPE, ctype);
254        if (req.get(HEADER_NX_SCHEMAS) == null && session.getDefaultSchemas() != null) {
255            req.put(HEADER_NX_SCHEMAS, session.getDefaultSchemas());
256        }
257        try {
258            return req.execute().get();
259        } catch (ExecutionException e) {
260            if (e.getCause() instanceof RemoteException) {
261                throw (RemoteException) e.getCause();
262            }
263            throw new IOException(e);
264        } catch (InterruptedException e) {
265            Thread.currentThread().interrupt();
266            throw new RuntimeException(e);
267        }
268    }
269
270    @Override
271    public Blob getFile(String path) throws IOException {
272        return session.getFile(path);
273    }
274
275    @Override
276    public Blobs getFiles(String path) throws IOException {
277        return session.getFiles(path);
278    }
279
280    @Override
281    public OperationDocumentation getOperation(String id) {
282        return session.getOperation(id);
283    }
284
285    @Override
286    public Map<String, OperationDocumentation> getOperations() {
287        return session.getOperations();
288    }
289
290    @Override
291    public <T> T getAdapter(Class<T> type) {
292        return session.getAdapter(type);
293    }
294
295    @Override
296    public String getDefaultSchemas() {
297        return session.getDefaultSchemas();
298    }
299
300    @Override
301    public void setDefaultSchemas(String defaultSchemas) {
302        session.setDefaultSchemas(defaultSchemas);
303    }
304
305    @Override
306    public void close() {
307        session.close();
308    }
309
310    public Connector getConnector() {
311        return session.getConnector();
312    }
313}