001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Antoine Taillefer <ataillefer@nuxeo.com>
016 */
017package org.nuxeo.ecm.automation.server.jaxrs.batch;
018
019import java.io.File;
020import java.io.FileOutputStream;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030
031import org.apache.commons.collections.CollectionUtils;
032import org.apache.commons.io.IOUtils;
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.nuxeo.ecm.core.api.Blob;
036import org.nuxeo.ecm.core.api.Blobs;
037import org.nuxeo.ecm.core.api.NuxeoException;
038import org.nuxeo.ecm.core.transientstore.AbstractStorageEntry;
039import org.nuxeo.ecm.core.transientstore.api.TransientStore;
040import org.nuxeo.runtime.api.Framework;
041
042/**
043 * Represents a batch file backed by the {@link TransientStore}.
044 * <p>
045 * The file can be chunked or not. If it is chunked it references its chunks as {@link BatchChunkEntry} objects.
046 *
047 * @since 7.4
048 * @see Batch
049 */
050public class BatchFileEntry extends AbstractStorageEntry {
051
052    private static final long serialVersionUID = 1L;
053
054    protected static final Log log = LogFactory.getLog(BatchFileEntry.class);
055
056    protected Blob chunkedBlob;
057
058    // Temporary file made from concatenated chunks
059    protected File tmpChunkedFile;
060
061    /**
062     * Returns a file entry that holds the given blob, not chunked.
063     */
064    public BatchFileEntry(String id, Blob blob) {
065        super(id);
066        put("chunked", false);
067        setBlobs(Collections.singletonList(blob));
068    }
069
070    /**
071     * Returns a file entry that references the file chunks.
072     *
073     * @see BatchChunkEntry
074     */
075    public BatchFileEntry(String id, int chunkCount, String fileName, String mime, long fileSize) {
076        super(id);
077        put("chunked", true);
078        put("chunkCount", chunkCount);
079        put("chunks", new HashMap<Integer, String>());
080        put("fileName", fileName);
081        put("mimeType", mime);
082        put("fileSize", fileSize);
083    }
084
085    public boolean isChunked() {
086        return (Boolean) get("chunked");
087    }
088
089    public String getFileName() {
090        if (isChunked()) {
091            return (String) get("fileName");
092        } else {
093            Blob blob = getBlob();
094            if (blob == null) {
095                return null;
096            } else {
097                return blob.getFilename();
098            }
099        }
100    }
101
102    public String getMimeType() {
103        if (isChunked()) {
104            return (String) get("mimeType");
105        } else {
106            Blob blob = getBlob();
107            if (blob == null) {
108                return null;
109            } else {
110                return blob.getMimeType();
111            }
112        }
113    }
114
115    public long getFileSize() {
116        if (isChunked()) {
117            return (long) get("fileSize");
118        } else {
119            Blob blob = getBlob();
120            if (blob == null) {
121                return -1;
122            } else {
123                return blob.getLength();
124            }
125        }
126    }
127
128    public int getChunkCount() {
129        if (!isChunked()) {
130            throw new NuxeoException(String.format("Cannot get chunk count of file entry %s as it is not chunked",
131                    getId()));
132        }
133        return (int) get("chunkCount");
134    }
135
136    @SuppressWarnings("unchecked")
137    public Map<Integer, String> getChunks() {
138        if (!isChunked()) {
139            throw new NuxeoException(String.format("Cannot get chunks of file entry %s as it is not chunked", getId()));
140        }
141        return (Map<Integer, String>) get("chunks");
142    }
143
144    public List<Integer> getOrderedChunkIds() {
145        if (!isChunked()) {
146            throw new NuxeoException(String.format("Cannot get chunk ids of file entry %s as it is not chunked",
147                    getId()));
148        }
149        List<Integer> sortedChunkIds = new ArrayList<Integer>(getChunks().keySet());
150        Collections.sort(sortedChunkIds);
151        return sortedChunkIds;
152    }
153
154    public Collection<String> getChunkEntryIds() {
155        if (!isChunked()) {
156            throw new NuxeoException(String.format("Cannot get chunk entry ids of file entry %s as it is not chunked",
157                    getId()));
158        }
159        return getChunks().values();
160    }
161
162    public boolean isChunksCompleted() {
163        return getChunks().size() == getChunkCount();
164    }
165
166    public Blob getBlob() {
167        if (isChunked()) {
168            // First check if blob chunks have already been read and concatenated
169            if (chunkedBlob != null) {
170                return chunkedBlob;
171            }
172            try {
173                Map<Integer, String> chunks = getChunks();
174                int uploadedChunkCount = chunks.size();
175                int chunkCount = getChunkCount();
176                if (uploadedChunkCount != chunkCount) {
177                    log.warn(String.format(
178                            "Cannot get blob for file entry %s as there are only %d uploaded chunks out of %d.",
179                            getId(), uploadedChunkCount, chunkCount));
180                    return null;
181                }
182                chunkedBlob = Blobs.createBlobWithExtension(null);
183                tmpChunkedFile = chunkedBlob.getFile();
184                BatchManager bm = Framework.getService(BatchManager.class);
185                // Sort chunk ids and concatenate them to build the entire blob
186                List<Integer> sortedChunks = getOrderedChunkIds();
187                for (int idx : sortedChunks) {
188                    BatchChunkEntry chunkEntry = (BatchChunkEntry) bm.getTransientStore().get(chunks.get(idx));
189                    Blob chunkBlob = chunkEntry.getBlob();
190                    if (chunkBlob != null) {
191                        transferTo(chunkBlob, tmpChunkedFile);
192                    }
193                }
194                chunkedBlob.setMimeType(getMimeType());
195                chunkedBlob.setFilename(getFileName());
196                return chunkedBlob;
197            } catch (IOException ioe) {
198                beforeRemove();
199                chunkedBlob = null;
200                throw new NuxeoException(ioe);
201            }
202        } else {
203            List<Blob> blobs = getBlobs();
204            if (CollectionUtils.isEmpty(blobs)) {
205                return null;
206            }
207            return blobs.get(0);
208        }
209    }
210
211    /**
212     * Appends the given blob to the given file.
213     */
214    protected void transferTo(Blob blob, File file) throws IOException {
215        try (OutputStream out = new FileOutputStream(file, true)) {
216            try (InputStream in = blob.getStream()) {
217                IOUtils.copy(in, out);
218            }
219        }
220    }
221
222    public String addChunk(int idx, Blob blob) {
223        if (!isChunked()) {
224            throw new NuxeoException("Cannot add a chunk to a non chunked file entry.");
225        }
226        int chunkCount = getChunkCount();
227        if (idx < 0) {
228            throw new NuxeoException(String.format("Cannot add chunk with negative index %d.", idx));
229        }
230        if (idx >= chunkCount) {
231            throw new NuxeoException(String.format(
232                    "Cannot add chunk with index %d to file entry %s as chunk count is %d.", idx, getId(), chunkCount));
233        }
234        if (getChunks().containsKey(idx)) {
235            throw new NuxeoException(String.format(
236                    "Cannot add chunk with index %d to file entry %s as it already exists.", idx, getId()));
237        }
238
239        String chunkEntryId = getId() + "_" + idx;
240        BatchChunkEntry chunkEntry = new BatchChunkEntry(chunkEntryId, blob);
241
242        BatchManager bm = Framework.getService(BatchManager.class);
243        bm.getTransientStore().put(chunkEntry);
244
245        return chunkEntryId;
246    }
247
248    @Override
249    public List<Blob> getBlobs() {
250        if (isChunked()) {
251            return Collections.singletonList(getBlob());
252        } else {
253            return super.getBlobs();
254        }
255    }
256
257    @Override
258    public void beforeRemove() {
259        if (tmpChunkedFile != null && tmpChunkedFile.exists()) {
260            log.debug(String.format("Deleting temporary chunked file %s", tmpChunkedFile.getAbsolutePath()));
261            tmpChunkedFile.delete();
262        }
263    }
264}