001/*
002 * (C) Copyright 2015-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Antoine Taillefer <ataillefer@nuxeo.com>
018 */
019package org.nuxeo.ecm.automation.server.jaxrs.batch;
020
021import java.io.File;
022import java.io.FileOutputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.OutputStream;
026import java.io.Serializable;
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Map;
033
034import org.apache.commons.collections.CollectionUtils;
035import org.apache.commons.io.IOUtils;
036import org.apache.commons.lang3.StringUtils;
037import org.apache.commons.lang3.math.NumberUtils;
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040import org.nuxeo.ecm.core.api.Blob;
041import org.nuxeo.ecm.core.api.Blobs;
042import org.nuxeo.ecm.core.api.NuxeoException;
043import org.nuxeo.ecm.core.transientstore.api.TransientStore;
044import org.nuxeo.runtime.api.Framework;
045
046/**
047 * Represents a batch file backed by the {@link TransientStore}.
048 * <p>
049 * The file can be chunked or not. If it is chunked it references its chunks as {@link TransientStore} entry keys.
050 *
051 * @since 7.4
052 * @see Batch
053 */
054public class BatchFileEntry {
055
056    protected static final Log log = LogFactory.getLog(BatchFileEntry.class);
057
058    protected String key;
059
060    protected Map<String, Serializable> params;
061
062    protected Blob blob;
063
064    protected Blob chunkedBlob;
065
066    /**
067     * Returns a file entry that holds the given blob, not chunked.
068     */
069    public BatchFileEntry(String key, Blob blob) {
070        this(key, false);
071        this.blob = blob;
072    }
073
074    /**
075     * Returns a file entry that references the file chunks.
076     */
077    public BatchFileEntry(String key, int chunkCount, String fileName, String mimeType, long fileSize) {
078        this(key, true);
079        params.put("chunkCount", String.valueOf(chunkCount));
080        if (!StringUtils.isEmpty(fileName)) {
081            params.put("fileName", fileName);
082        }
083        if (!StringUtils.isEmpty(mimeType)) {
084            params.put("mimeType", mimeType);
085        }
086        params.put("fileSize", String.valueOf(fileSize));
087    }
088
089    /**
090     * Returns a file entry that holds the given parameters.
091     */
092    public BatchFileEntry(String key, Map<String, Serializable> params) {
093        this.key = key;
094        this.params = params;
095    }
096
097    protected BatchFileEntry(String key, boolean chunked) {
098        this.key = key;
099        params = new HashMap<>();
100        params.put(Batch.CHUNKED_PARAM_NAME, String.valueOf(chunked));
101    }
102
103    public String getKey() {
104        return key;
105    }
106
107    public Map<String, Serializable> getParams() {
108        return params;
109    }
110
111    public boolean isChunked() {
112        return Boolean.parseBoolean((String) params.get(Batch.CHUNKED_PARAM_NAME));
113    }
114
115    public String getFileName() {
116        if (isChunked()) {
117            return (String) params.get("fileName");
118        } else {
119            Blob blob = getBlob();
120            if (blob == null) {
121                return null;
122            } else {
123                return blob.getFilename();
124            }
125        }
126    }
127
128    public String getMimeType() {
129        if (isChunked()) {
130            return (String) params.get("mimeType");
131        } else {
132            Blob blob = getBlob();
133            if (blob == null) {
134                return null;
135            } else {
136                return blob.getMimeType();
137            }
138        }
139    }
140
141    public long getFileSize() {
142        if (isChunked()) {
143            return Long.parseLong((String) params.get("fileSize"));
144        } else {
145            Blob blob = getBlob();
146            if (blob == null) {
147                return -1;
148            } else {
149                return blob.getLength();
150            }
151        }
152    }
153
154    public int getChunkCount() {
155        if (!isChunked()) {
156            throw new NuxeoException(
157                    String.format("Cannot get chunk count of file entry %s as it is not chunked", key));
158        }
159        return Integer.parseInt((String) params.get("chunkCount"));
160    }
161
162    public Map<Integer, String> getChunks() {
163        if (!isChunked()) {
164            throw new NuxeoException(String.format("Cannot get chunks of file entry %s as it is not chunked", key));
165        }
166        Map<Integer, String> chunks = new HashMap<>();
167        for (String param : params.keySet()) {
168            if (NumberUtils.isDigits(param)) {
169                chunks.put(Integer.valueOf(param), (String) params.get(param));
170            }
171        }
172        return chunks;
173    }
174
175    public List<Integer> getOrderedChunkIndexes() {
176        if (!isChunked()) {
177            throw new NuxeoException(
178                    String.format("Cannot get chunk indexes of file entry %s as it is not chunked", key));
179        }
180        List<Integer> sortedChunkIndexes = new ArrayList<>(getChunks().keySet());
181        Collections.sort(sortedChunkIndexes);
182        return sortedChunkIndexes;
183    }
184
185    public Collection<String> getChunkEntryKeys() {
186        if (!isChunked()) {
187            throw new NuxeoException(
188                    String.format("Cannot get chunk entry keys of file entry %s as it is not chunked", key));
189        }
190        return getChunks().values();
191    }
192
193    public boolean isChunksCompleted() {
194        return getChunks().size() == getChunkCount();
195    }
196
197    public Blob getBlob() {
198        if (isChunked()) {
199            // First check if blob chunks have already been read and concatenated
200            if (chunkedBlob != null) {
201                return chunkedBlob;
202            }
203            File tmpChunkedFile = null;
204            try {
205                Map<Integer, String> chunks = getChunks();
206                int uploadedChunkCount = chunks.size();
207                int chunkCount = getChunkCount();
208                if (uploadedChunkCount != chunkCount) {
209                    log.warn(String.format(
210                            "Cannot get blob for file entry %s as there are only %d uploaded chunks out of %d.", key,
211                            uploadedChunkCount, chunkCount));
212                    return null;
213                }
214                chunkedBlob = Blobs.createBlobWithExtension(null);
215                // Temporary file made from concatenated chunks
216                tmpChunkedFile = chunkedBlob.getFile();
217                BatchManager bm = Framework.getService(BatchManager.class);
218                TransientStore ts = bm.getTransientStore();
219                // Sort chunk indexes and concatenate them to build the entire blob
220                List<Integer> sortedChunkIndexes = getOrderedChunkIndexes();
221                for (int index : sortedChunkIndexes) {
222                    Blob chunk = getChunk(ts, chunks.get(index));
223                    if (chunk != null) {
224                        transferTo(chunk, tmpChunkedFile);
225                    }
226                }
227                // Store tmpChunkedFile as a parameter for later deletion
228                ts.putParameter(key, "tmpChunkedFilePath", tmpChunkedFile.getAbsolutePath());
229                chunkedBlob.setMimeType(getMimeType());
230                chunkedBlob.setFilename(getFileName());
231                return chunkedBlob;
232            } catch (IOException ioe) {
233                if (tmpChunkedFile != null && tmpChunkedFile.exists()) {
234                    tmpChunkedFile.delete();
235                }
236                chunkedBlob = null;
237                throw new NuxeoException(ioe);
238            }
239        } else {
240            return blob;
241        }
242    }
243
244    protected Blob getChunk(TransientStore ts, String key) {
245        List<Blob> blobs = ts.getBlobs(key);
246        if (CollectionUtils.isEmpty(blobs)) {
247            return null;
248        }
249        return blobs.get(0);
250    }
251
252    /**
253     * Appends the given blob to the given file.
254     */
255    protected void transferTo(Blob blob, File file) throws IOException {
256        try (OutputStream out = new FileOutputStream(file, true)) {
257            try (InputStream in = blob.getStream()) {
258                IOUtils.copy(in, out);
259            }
260        }
261    }
262
263    public String addChunk(int index, Blob blob) {
264        if (!isChunked()) {
265            throw new NuxeoException("Cannot add a chunk to a non chunked file entry.");
266        }
267        int chunkCount = getChunkCount();
268        if (index < 0) {
269            throw new NuxeoException(String.format("Cannot add chunk with negative index %d.", index));
270        }
271        if (index >= chunkCount) {
272            throw new NuxeoException(String.format(
273                    "Cannot add chunk with index %d to file entry %s as chunk count is %d.", index, key, chunkCount));
274        }
275        if (getChunks().containsKey(index)) {
276            throw new NuxeoException(
277                    String.format("Cannot add chunk with index %d to file entry %s as it already exists.", index, key));
278        }
279
280        String chunkEntryKey = key + "_" + index;
281        BatchManager bm = Framework.getService(BatchManager.class);
282        TransientStore ts = bm.getTransientStore();
283        ts.putBlobs(chunkEntryKey, Collections.singletonList(blob));
284        ts.putParameter(key, String.valueOf(index), chunkEntryKey);
285
286        return chunkEntryKey;
287    }
288
289    public void beforeRemove() {
290        BatchManager bm = Framework.getService(BatchManager.class);
291        String tmpChunkedFilePath = (String) bm.getTransientStore().getParameter(key, "tmpChunkedFilePath");
292        if (tmpChunkedFilePath != null) {
293            File tmpChunkedFile = new File(tmpChunkedFilePath);
294            if (tmpChunkedFile.exists()) {
295                log.debug(String.format("Deleting temporary chunked file %s", tmpChunkedFilePath));
296                tmpChunkedFile.delete();
297            }
298        }
299    }
300}