001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Antoine Taillefer <ataillefer@nuxeo.com>
018 */
019package org.nuxeo.ecm.automation.server.jaxrs.batch;
020
021import java.io.File;
022import java.io.FileOutputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.OutputStream;
026import java.io.Serializable;
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Map;
033
034import org.apache.commons.collections.CollectionUtils;
035import org.apache.commons.io.IOUtils;
036import org.apache.commons.lang.StringUtils;
037import org.apache.commons.lang.math.NumberUtils;
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040import org.nuxeo.ecm.core.api.Blob;
041import org.nuxeo.ecm.core.api.Blobs;
042import org.nuxeo.ecm.core.api.NuxeoException;
043import org.nuxeo.ecm.core.transientstore.api.TransientStore;
044import org.nuxeo.runtime.api.Framework;
045
046/**
047 * Represents a batch file backed by the {@link TransientStore}.
048 * <p>
049 * The file can be chunked or not. If it is chunked it references its chunks as {@link TransientStore} entry keys.
050 *
051 * @since 7.4
052 * @see Batch
053 */
054public class BatchFileEntry {
055
056    protected static final Log log = LogFactory.getLog(BatchFileEntry.class);
057
058    protected String key;
059
060    protected Map<String, Serializable> params;
061
062    protected Blob blob;
063
064    protected Blob chunkedBlob;
065
066    /**
067     * Returns a file entry that holds the given blob, not chunked.
068     */
069    public BatchFileEntry(String key, Blob blob) {
070        this(key, false);
071        this.blob = blob;
072    }
073
074    /**
075     * Returns a file entry that references the file chunks.
076     *
077     * @see BatchChunkEntry
078     */
079    public BatchFileEntry(String key, int chunkCount, String fileName, String mimeType, long fileSize) {
080        this(key, true);
081        params.put("chunkCount", String.valueOf(chunkCount));
082        if (!StringUtils.isEmpty(fileName)) {
083            params.put("fileName", fileName);
084        }
085        if (!StringUtils.isEmpty(mimeType)) {
086            params.put("mimeType", mimeType);
087        }
088        params.put("fileSize", String.valueOf(fileSize));
089    }
090
091    /**
092     * Returns a file entry that holds the given parameters.
093     */
094    public BatchFileEntry(String key, Map<String, Serializable> params) {
095        this.key = key;
096        this.params = params;
097    }
098
099    protected BatchFileEntry(String key, boolean chunked) {
100        this.key = key;
101        params = new HashMap<>();
102        params.put(Batch.CHUNKED_PARAM_NAME, String.valueOf(chunked));
103    }
104
105    public String getKey() {
106        return key;
107    }
108
109    public Map<String, Serializable> getParams() {
110        return params;
111    }
112
113    public boolean isChunked() {
114        return Boolean.parseBoolean((String) params.get(Batch.CHUNKED_PARAM_NAME));
115    }
116
117    public String getFileName() {
118        if (isChunked()) {
119            return (String) params.get("fileName");
120        } else {
121            Blob blob = getBlob();
122            if (blob == null) {
123                return null;
124            } else {
125                return blob.getFilename();
126            }
127        }
128    }
129
130    public String getMimeType() {
131        if (isChunked()) {
132            return (String) params.get("mimeType");
133        } else {
134            Blob blob = getBlob();
135            if (blob == null) {
136                return null;
137            } else {
138                return blob.getMimeType();
139            }
140        }
141    }
142
143    public long getFileSize() {
144        if (isChunked()) {
145            return Long.parseLong((String) params.get("fileSize"));
146        } else {
147            Blob blob = getBlob();
148            if (blob == null) {
149                return -1;
150            } else {
151                return blob.getLength();
152            }
153        }
154    }
155
156    public int getChunkCount() {
157        if (!isChunked()) {
158            throw new NuxeoException(String.format("Cannot get chunk count of file entry %s as it is not chunked", key));
159        }
160        return Integer.parseInt((String) params.get("chunkCount"));
161    }
162
163    public Map<Integer, String> getChunks() {
164        if (!isChunked()) {
165            throw new NuxeoException(String.format("Cannot get chunks of file entry %s as it is not chunked", key));
166        }
167        Map<Integer, String> chunks = new HashMap<>();
168        for (String param : params.keySet()) {
169            if (NumberUtils.isDigits(param)) {
170                chunks.put(Integer.parseInt(param), (String) params.get(param));
171            }
172        }
173        return chunks;
174    }
175
176    public List<Integer> getOrderedChunkIndexes() {
177        if (!isChunked()) {
178            throw new NuxeoException(String.format("Cannot get chunk indexes of file entry %s as it is not chunked",
179                    key));
180        }
181        List<Integer> sortedChunkIndexes = new ArrayList<Integer>(getChunks().keySet());
182        Collections.sort(sortedChunkIndexes);
183        return sortedChunkIndexes;
184    }
185
186    public Collection<String> getChunkEntryKeys() {
187        if (!isChunked()) {
188            throw new NuxeoException(String.format("Cannot get chunk entry keys of file entry %s as it is not chunked",
189                    key));
190        }
191        return getChunks().values();
192    }
193
194    public boolean isChunksCompleted() {
195        return getChunks().size() == getChunkCount();
196    }
197
198    public Blob getBlob() {
199        if (isChunked()) {
200            // First check if blob chunks have already been read and concatenated
201            if (chunkedBlob != null) {
202                return chunkedBlob;
203            }
204            File tmpChunkedFile = null;
205            try {
206                Map<Integer, String> chunks = getChunks();
207                int uploadedChunkCount = chunks.size();
208                int chunkCount = getChunkCount();
209                if (uploadedChunkCount != chunkCount) {
210                    log.warn(String.format(
211                            "Cannot get blob for file entry %s as there are only %d uploaded chunks out of %d.", key,
212                            uploadedChunkCount, chunkCount));
213                    return null;
214                }
215                chunkedBlob = Blobs.createBlobWithExtension(null);
216                // Temporary file made from concatenated chunks
217                tmpChunkedFile = chunkedBlob.getFile();
218                BatchManager bm = Framework.getService(BatchManager.class);
219                TransientStore ts = bm.getTransientStore();
220                // Sort chunk indexes and concatenate them to build the entire blob
221                List<Integer> sortedChunkIndexes = getOrderedChunkIndexes();
222                for (int index : sortedChunkIndexes) {
223                    Blob chunk = getChunk(ts, chunks.get(index));
224                    if (chunk != null) {
225                        transferTo(chunk, tmpChunkedFile);
226                    }
227                }
228                // Store tmpChunkedFile as a parameter for later deletion
229                ts.putParameter(key, "tmpChunkedFilePath", tmpChunkedFile.getAbsolutePath());
230                chunkedBlob.setMimeType(getMimeType());
231                chunkedBlob.setFilename(getFileName());
232                return chunkedBlob;
233            } catch (IOException ioe) {
234                if (tmpChunkedFile != null && tmpChunkedFile.exists()) {
235                    tmpChunkedFile.delete();
236                }
237                chunkedBlob = null;
238                throw new NuxeoException(ioe);
239            }
240        } else {
241            return blob;
242        }
243    }
244
245    protected Blob getChunk(TransientStore ts, String key) {
246        List<Blob> blobs = ts.getBlobs(key);
247        if (CollectionUtils.isEmpty(blobs)) {
248            return null;
249        }
250        return blobs.get(0);
251    }
252
253    /**
254     * Appends the given blob to the given file.
255     */
256    protected void transferTo(Blob blob, File file) throws IOException {
257        try (OutputStream out = new FileOutputStream(file, true)) {
258            try (InputStream in = blob.getStream()) {
259                IOUtils.copy(in, out);
260            }
261        }
262    }
263
264    public String addChunk(int index, Blob blob) {
265        if (!isChunked()) {
266            throw new NuxeoException("Cannot add a chunk to a non chunked file entry.");
267        }
268        int chunkCount = getChunkCount();
269        if (index < 0) {
270            throw new NuxeoException(String.format("Cannot add chunk with negative index %d.", index));
271        }
272        if (index >= chunkCount) {
273            throw new NuxeoException(String.format(
274                    "Cannot add chunk with index %d to file entry %s as chunk count is %d.", index, key, chunkCount));
275        }
276        if (getChunks().containsKey(index)) {
277            throw new NuxeoException(String.format(
278                    "Cannot add chunk with index %d to file entry %s as it already exists.", index, key));
279        }
280
281        String chunkEntryKey = key + "_" + index;
282        BatchManager bm = Framework.getService(BatchManager.class);
283        TransientStore ts = bm.getTransientStore();
284        ts.putBlobs(chunkEntryKey, Collections.singletonList(blob));
285        ts.putParameter(key, String.valueOf(index), chunkEntryKey);
286
287        return chunkEntryKey;
288    }
289
290    public void beforeRemove() {
291        BatchManager bm = Framework.getService(BatchManager.class);
292        String tmpChunkedFilePath = (String) bm.getTransientStore().getParameter(key, "tmpChunkedFilePath");
293        if (tmpChunkedFilePath != null) {
294            File tmpChunkedFile = new File(tmpChunkedFilePath);
295            if (tmpChunkedFile.exists()) {
296                log.debug(String.format("Deleting temporary chunked file %s", tmpChunkedFilePath));
297                tmpChunkedFile.delete();
298            }
299        }
300    }
301}