001/*
002 * (C) Copyright 2015-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Antoine Taillefer <ataillefer@nuxeo.com>
018 */
019package org.nuxeo.ecm.automation.server.jaxrs.batch;
020
021import java.io.File;
022import java.io.FileOutputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.OutputStream;
026import java.io.Serializable;
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Map;
033
034import org.apache.commons.collections.CollectionUtils;
035import org.apache.commons.io.IOUtils;
036import org.apache.commons.lang3.StringUtils;
037import org.apache.commons.lang3.math.NumberUtils;
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040import org.nuxeo.ecm.core.api.Blob;
041import org.nuxeo.ecm.core.api.Blobs;
042import org.nuxeo.ecm.core.api.NuxeoException;
043import org.nuxeo.ecm.core.transientstore.api.TransientStore;
044import org.nuxeo.runtime.api.Framework;
045
046/**
047 * Represents a batch file backed by the {@link TransientStore}.
048 * <p>
049 * The file can be chunked or not. If it is chunked it references its chunks as {@link TransientStore} entry keys.
050 *
051 * @since 7.4
052 * @see Batch
053 */
054public class BatchFileEntry {
055
056    protected static final Log log = LogFactory.getLog(BatchFileEntry.class);
057
058    protected TransientStore transientStore;
059
060    protected String key;
061
062    protected Map<String, Serializable> params;
063
064    protected Blob blob;
065
066    protected Blob chunkedBlob;
067
068    /**
069     * Returns a file entry that holds the given blob, not chunked.
070     */
071    public BatchFileEntry(TransientStore transientStore, String key, Blob blob) {
072        this(transientStore, key, false);
073        this.blob = blob;
074    }
075
076    /**
077     * Returns a file entry that references the file chunks.
078     */
079    public BatchFileEntry(TransientStore transientStore, String key, int chunkCount, String fileName, String mimeType,
080            long fileSize) {
081        this(transientStore, key, true);
082        params.put("chunkCount", String.valueOf(chunkCount));
083        if (!StringUtils.isEmpty(fileName)) {
084            params.put("fileName", fileName);
085        }
086        if (!StringUtils.isEmpty(mimeType)) {
087            params.put("mimeType", mimeType);
088        }
089        params.put("fileSize", String.valueOf(fileSize));
090    }
091
092    /**
093     * Returns a file entry that holds the given parameters.
094     */
095    public BatchFileEntry(TransientStore transientStore, String key, Map<String, Serializable> params) {
096        this.transientStore = transientStore;
097        this.key = key;
098        this.params = params;
099    }
100
101    protected BatchFileEntry(TransientStore transientStore, String key, boolean chunked) {
102        this(transientStore, key, new HashMap<>());
103        params.put(Batch.CHUNKED_PARAM_NAME, String.valueOf(chunked));
104    }
105
106    public String getKey() {
107        return key;
108    }
109
110    public Map<String, Serializable> getParams() {
111        return params;
112    }
113
114    public boolean isChunked() {
115        return Boolean.parseBoolean((String) params.get(Batch.CHUNKED_PARAM_NAME));
116    }
117
118    public String getFileName() {
119        if (isChunked()) {
120            return (String) params.get("fileName");
121        } else {
122            Blob blob = getBlob();
123            if (blob == null) {
124                return null;
125            } else {
126                return blob.getFilename();
127            }
128        }
129    }
130
131    public String getMimeType() {
132        if (isChunked()) {
133            return (String) params.get("mimeType");
134        } else {
135            Blob blob = getBlob();
136            if (blob == null) {
137                return null;
138            } else {
139                return blob.getMimeType();
140            }
141        }
142    }
143
144    public long getFileSize() {
145        if (isChunked()) {
146            return Long.parseLong((String) params.get("fileSize"));
147        } else {
148            Blob blob = getBlob();
149            if (blob == null) {
150                return -1;
151            } else {
152                return blob.getLength();
153            }
154        }
155    }
156
157    public int getChunkCount() {
158        if (!isChunked()) {
159            throw new NuxeoException(
160                    String.format("Cannot get chunk count of file entry %s as it is not chunked", key));
161        }
162        return Integer.parseInt((String) params.get("chunkCount"));
163    }
164
165    public Map<Integer, String> getChunks() {
166        if (!isChunked()) {
167            throw new NuxeoException(String.format("Cannot get chunks of file entry %s as it is not chunked", key));
168        }
169        Map<Integer, String> chunks = new HashMap<>();
170        for (String param : params.keySet()) {
171            if (NumberUtils.isDigits(param)) {
172                chunks.put(Integer.valueOf(param), (String) params.get(param));
173            }
174        }
175        return chunks;
176    }
177
178    public List<Integer> getOrderedChunkIndexes() {
179        if (!isChunked()) {
180            throw new NuxeoException(
181                    String.format("Cannot get chunk indexes of file entry %s as it is not chunked", key));
182        }
183        List<Integer> sortedChunkIndexes = new ArrayList<>(getChunks().keySet());
184        Collections.sort(sortedChunkIndexes);
185        return sortedChunkIndexes;
186    }
187
188    public Collection<String> getChunkEntryKeys() {
189        if (!isChunked()) {
190            throw new NuxeoException(
191                    String.format("Cannot get chunk entry keys of file entry %s as it is not chunked", key));
192        }
193        return getChunks().values();
194    }
195
196    public boolean isChunksCompleted() {
197        return getChunks().size() == getChunkCount();
198    }
199
200    public Blob getBlob() {
201        if (isChunked()) {
202            // First check if blob chunks have already been read and concatenated
203            if (chunkedBlob != null) {
204                return chunkedBlob;
205            }
206            File tmpChunkedFile = null;
207            try {
208                Map<Integer, String> chunks = getChunks();
209                int uploadedChunkCount = chunks.size();
210                int chunkCount = getChunkCount();
211                if (uploadedChunkCount != chunkCount) {
212                    log.warn(String.format(
213                            "Cannot get blob for file entry %s as there are only %d uploaded chunks out of %d.", key,
214                            uploadedChunkCount, chunkCount));
215                    return null;
216                }
217                chunkedBlob = Blobs.createBlobWithExtension(null);
218                // Temporary file made from concatenated chunks
219                tmpChunkedFile = chunkedBlob.getFile();
220                // Sort chunk indexes and concatenate them to build the entire blob
221                List<Integer> sortedChunkIndexes = getOrderedChunkIndexes();
222                for (int index : sortedChunkIndexes) {
223                    Blob chunk = getChunk(transientStore, chunks.get(index));
224                    if (chunk != null) {
225                        transferTo(chunk, tmpChunkedFile);
226                    }
227                }
228                // Store tmpChunkedFile as a parameter for later deletion
229                transientStore.putParameter(key, "tmpChunkedFilePath", tmpChunkedFile.getAbsolutePath());
230                chunkedBlob.setMimeType(getMimeType());
231                chunkedBlob.setFilename(getFileName());
232                return chunkedBlob;
233            } catch (IOException ioe) {
234                if (tmpChunkedFile != null && tmpChunkedFile.exists()) {
235                    tmpChunkedFile.delete();
236                }
237                chunkedBlob = null;
238                throw new NuxeoException(ioe);
239            }
240        } else {
241            return blob;
242        }
243    }
244
245    protected Blob getChunk(TransientStore ts, String key) {
246        List<Blob> blobs = ts.getBlobs(key);
247        if (CollectionUtils.isEmpty(blobs)) {
248            return null;
249        }
250        return blobs.get(0);
251    }
252
253    /**
254     * Appends the given blob to the given file.
255     */
256    protected void transferTo(Blob blob, File file) throws IOException {
257        try (OutputStream out = new FileOutputStream(file, true)) {
258            try (InputStream in = blob.getStream()) {
259                IOUtils.copy(in, out);
260            }
261        }
262    }
263
264    public String addChunk(int index, Blob blob) {
265        if (!isChunked()) {
266            throw new NuxeoException("Cannot add a chunk to a non chunked file entry.");
267        }
268        int chunkCount = getChunkCount();
269        if (index < 0) {
270            throw new NuxeoException(String.format("Cannot add chunk with negative index %d.", index));
271        }
272        if (index >= chunkCount) {
273            throw new NuxeoException(String.format(
274                    "Cannot add chunk with index %d to file entry %s as chunk count is %d.", index, key, chunkCount));
275        }
276        if (getChunks().containsKey(index)) {
277            throw new NuxeoException(
278                    String.format("Cannot add chunk with index %d to file entry %s as it already exists.", index, key));
279        }
280
281        String chunkEntryKey = key + "_" + index;
282        transientStore.putBlobs(chunkEntryKey, Collections.singletonList(blob));
283        transientStore.putParameter(key, String.valueOf(index), chunkEntryKey);
284
285        return chunkEntryKey;
286    }
287
288    public void beforeRemove() {
289        String tmpChunkedFilePath = (String) transientStore.getParameter(key, "tmpChunkedFilePath");
290        if (tmpChunkedFilePath != null) {
291            File tmpChunkedFile = new File(tmpChunkedFilePath);
292            if (tmpChunkedFile.exists()) {
293                log.debug(String.format("Deleting temporary chunked file %s", tmpChunkedFilePath));
294                tmpChunkedFile.delete();
295            }
296        }
297    }
298}