001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Antoine Taillefer <ataillefer@nuxeo.com>
016 */
017package org.nuxeo.ecm.automation.server.jaxrs.batch;
018
019import java.io.File;
020import java.io.FileOutputStream;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024import java.io.Serializable;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031
032import org.apache.commons.collections.CollectionUtils;
033import org.apache.commons.io.IOUtils;
034import org.apache.commons.lang.StringUtils;
035import org.apache.commons.lang.math.NumberUtils;
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038import org.nuxeo.ecm.core.api.Blob;
039import org.nuxeo.ecm.core.api.Blobs;
040import org.nuxeo.ecm.core.api.NuxeoException;
041import org.nuxeo.ecm.core.transientstore.api.TransientStore;
042import org.nuxeo.runtime.api.Framework;
043
044/**
045 * Represents a batch file backed by the {@link TransientStore}.
046 * <p>
047 * The file can be chunked or not. If it is chunked it references its chunks as {@link TransientStore} entry keys.
048 *
049 * @since 7.4
050 * @see Batch
051 */
052public class BatchFileEntry {
053
054    protected static final Log log = LogFactory.getLog(BatchFileEntry.class);
055
056    protected String key;
057
058    protected Map<String, Serializable> params;
059
060    protected Blob blob;
061
062    protected Blob chunkedBlob;
063
064    /**
065     * Returns a file entry that holds the given blob, not chunked.
066     */
067    public BatchFileEntry(String key, Blob blob) {
068        this(key, false);
069        this.blob = blob;
070    }
071
072    /**
073     * Returns a file entry that references the file chunks.
074     *
075     * @see BatchChunkEntry
076     */
077    public BatchFileEntry(String key, int chunkCount, String fileName, String mimeType, long fileSize) {
078        this(key, true);
079        params.put("chunkCount", String.valueOf(chunkCount));
080        if (!StringUtils.isEmpty(fileName)) {
081            params.put("fileName", fileName);
082        }
083        if (!StringUtils.isEmpty(mimeType)) {
084            params.put("mimeType", mimeType);
085        }
086        params.put("fileSize", String.valueOf(fileSize));
087    }
088
089    /**
090     * Returns a file entry that holds the given parameters.
091     */
092    public BatchFileEntry(String key, Map<String, Serializable> params) {
093        this.key = key;
094        this.params = params;
095    }
096
097    protected BatchFileEntry(String key, boolean chunked) {
098        this.key = key;
099        params = new HashMap<>();
100        params.put(Batch.CHUNKED_PARAM_NAME, String.valueOf(chunked));
101    }
102
103    public String getKey() {
104        return key;
105    }
106
107    public Map<String, Serializable> getParams() {
108        return params;
109    }
110
111    public boolean isChunked() {
112        return Boolean.parseBoolean((String) params.get(Batch.CHUNKED_PARAM_NAME));
113    }
114
115    public String getFileName() {
116        if (isChunked()) {
117            return (String) params.get("fileName");
118        } else {
119            Blob blob = getBlob();
120            if (blob == null) {
121                return null;
122            } else {
123                return blob.getFilename();
124            }
125        }
126    }
127
128    public String getMimeType() {
129        if (isChunked()) {
130            return (String) params.get("mimeType");
131        } else {
132            Blob blob = getBlob();
133            if (blob == null) {
134                return null;
135            } else {
136                return blob.getMimeType();
137            }
138        }
139    }
140
141    public long getFileSize() {
142        if (isChunked()) {
143            return Long.parseLong((String) params.get("fileSize"));
144        } else {
145            Blob blob = getBlob();
146            if (blob == null) {
147                return -1;
148            } else {
149                return blob.getLength();
150            }
151        }
152    }
153
154    public int getChunkCount() {
155        if (!isChunked()) {
156            throw new NuxeoException(String.format("Cannot get chunk count of file entry %s as it is not chunked", key));
157        }
158        return Integer.parseInt((String) params.get("chunkCount"));
159    }
160
161    public Map<Integer, String> getChunks() {
162        if (!isChunked()) {
163            throw new NuxeoException(String.format("Cannot get chunks of file entry %s as it is not chunked", key));
164        }
165        Map<Integer, String> chunks = new HashMap<>();
166        for (String param : params.keySet()) {
167            if (NumberUtils.isDigits(param)) {
168                chunks.put(Integer.parseInt(param), (String) params.get(param));
169            }
170        }
171        return chunks;
172    }
173
174    public List<Integer> getOrderedChunkIndexes() {
175        if (!isChunked()) {
176            throw new NuxeoException(String.format("Cannot get chunk indexes of file entry %s as it is not chunked",
177                    key));
178        }
179        List<Integer> sortedChunkIndexes = new ArrayList<Integer>(getChunks().keySet());
180        Collections.sort(sortedChunkIndexes);
181        return sortedChunkIndexes;
182    }
183
184    public Collection<String> getChunkEntryKeys() {
185        if (!isChunked()) {
186            throw new NuxeoException(String.format("Cannot get chunk entry keys of file entry %s as it is not chunked",
187                    key));
188        }
189        return getChunks().values();
190    }
191
192    public boolean isChunksCompleted() {
193        return getChunks().size() == getChunkCount();
194    }
195
196    public Blob getBlob() {
197        if (isChunked()) {
198            // First check if blob chunks have already been read and concatenated
199            if (chunkedBlob != null) {
200                return chunkedBlob;
201            }
202            File tmpChunkedFile = null;
203            try {
204                Map<Integer, String> chunks = getChunks();
205                int uploadedChunkCount = chunks.size();
206                int chunkCount = getChunkCount();
207                if (uploadedChunkCount != chunkCount) {
208                    log.warn(String.format(
209                            "Cannot get blob for file entry %s as there are only %d uploaded chunks out of %d.", key,
210                            uploadedChunkCount, chunkCount));
211                    return null;
212                }
213                chunkedBlob = Blobs.createBlobWithExtension(null);
214                // Temporary file made from concatenated chunks
215                tmpChunkedFile = chunkedBlob.getFile();
216                BatchManager bm = Framework.getService(BatchManager.class);
217                TransientStore ts = bm.getTransientStore();
218                // Sort chunk indexes and concatenate them to build the entire blob
219                List<Integer> sortedChunkIndexes = getOrderedChunkIndexes();
220                for (int index : sortedChunkIndexes) {
221                    Blob chunk = getChunk(ts, chunks.get(index));
222                    if (chunk != null) {
223                        transferTo(chunk, tmpChunkedFile);
224                    }
225                }
226                // Store tmpChunkedFile as a parameter for later deletion
227                ts.putParameter(key, "tmpChunkedFilePath", tmpChunkedFile.getAbsolutePath());
228                chunkedBlob.setMimeType(getMimeType());
229                chunkedBlob.setFilename(getFileName());
230                return chunkedBlob;
231            } catch (IOException ioe) {
232                if (tmpChunkedFile != null && tmpChunkedFile.exists()) {
233                    tmpChunkedFile.delete();
234                }
235                chunkedBlob = null;
236                throw new NuxeoException(ioe);
237            }
238        } else {
239            return blob;
240        }
241    }
242
243    protected Blob getChunk(TransientStore ts, String key) {
244        List<Blob> blobs = ts.getBlobs(key);
245        if (CollectionUtils.isEmpty(blobs)) {
246            return null;
247        }
248        return blobs.get(0);
249    }
250
251    /**
252     * Appends the given blob to the given file.
253     */
254    protected void transferTo(Blob blob, File file) throws IOException {
255        try (OutputStream out = new FileOutputStream(file, true)) {
256            try (InputStream in = blob.getStream()) {
257                IOUtils.copy(in, out);
258            }
259        }
260    }
261
262    public String addChunk(int index, Blob blob) {
263        if (!isChunked()) {
264            throw new NuxeoException("Cannot add a chunk to a non chunked file entry.");
265        }
266        int chunkCount = getChunkCount();
267        if (index < 0) {
268            throw new NuxeoException(String.format("Cannot add chunk with negative index %d.", index));
269        }
270        if (index >= chunkCount) {
271            throw new NuxeoException(String.format(
272                    "Cannot add chunk with index %d to file entry %s as chunk count is %d.", index, key, chunkCount));
273        }
274        if (getChunks().containsKey(index)) {
275            throw new NuxeoException(String.format(
276                    "Cannot add chunk with index %d to file entry %s as it already exists.", index, key));
277        }
278
279        String chunkEntryKey = key + "_" + index;
280        BatchManager bm = Framework.getService(BatchManager.class);
281        TransientStore ts = bm.getTransientStore();
282        ts.putBlobs(chunkEntryKey, Collections.singletonList(blob));
283        ts.putParameter(key, String.valueOf(index), chunkEntryKey);
284
285        return chunkEntryKey;
286    }
287
288    public void beforeRemove() {
289        BatchManager bm = Framework.getService(BatchManager.class);
290        String tmpChunkedFilePath = (String) bm.getTransientStore().getParameter(key, "tmpChunkedFilePath");
291        if (tmpChunkedFilePath != null) {
292            File tmpChunkedFile = new File(tmpChunkedFilePath);
293            if (tmpChunkedFile.exists()) {
294                log.debug(String.format("Deleting temporary chunked file %s", tmpChunkedFilePath));
295                tmpChunkedFile.delete();
296            }
297        }
298    }
299}