001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Antoine Taillefer <ataillefer@nuxeo.com> 016 */ 017package org.nuxeo.ecm.automation.server.jaxrs.batch; 018 019import java.io.File; 020import java.io.FileOutputStream; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024import java.io.Serializable; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031 032import org.apache.commons.collections.CollectionUtils; 033import org.apache.commons.io.IOUtils; 034import org.apache.commons.lang.StringUtils; 035import org.apache.commons.lang.math.NumberUtils; 036import org.apache.commons.logging.Log; 037import org.apache.commons.logging.LogFactory; 038import org.nuxeo.ecm.core.api.Blob; 039import org.nuxeo.ecm.core.api.Blobs; 040import org.nuxeo.ecm.core.api.NuxeoException; 041import org.nuxeo.ecm.core.transientstore.api.TransientStore; 042import org.nuxeo.runtime.api.Framework; 043 044/** 045 * Represents a batch file backed by the {@link TransientStore}. 046 * <p> 047 * The file can be chunked or not. If it is chunked it references its chunks as {@link TransientStore} entry keys. 048 * 049 * @since 7.4 050 * @see Batch 051 */ 052public class BatchFileEntry { 053 054 protected static final Log log = LogFactory.getLog(BatchFileEntry.class); 055 056 protected String key; 057 058 protected Map<String, Serializable> params; 059 060 protected Blob blob; 061 062 protected Blob chunkedBlob; 063 064 /** 065 * Returns a file entry that holds the given blob, not chunked. 066 */ 067 public BatchFileEntry(String key, Blob blob) { 068 this(key, false); 069 this.blob = blob; 070 } 071 072 /** 073 * Returns a file entry that references the file chunks. 074 * 075 * @see BatchChunkEntry 076 */ 077 public BatchFileEntry(String key, int chunkCount, String fileName, String mimeType, long fileSize) { 078 this(key, true); 079 params.put("chunkCount", String.valueOf(chunkCount)); 080 if (!StringUtils.isEmpty(fileName)) { 081 params.put("fileName", fileName); 082 } 083 if (!StringUtils.isEmpty(mimeType)) { 084 params.put("mimeType", mimeType); 085 } 086 params.put("fileSize", String.valueOf(fileSize)); 087 } 088 089 /** 090 * Returns a file entry that holds the given parameters. 091 */ 092 public BatchFileEntry(String key, Map<String, Serializable> params) { 093 this.key = key; 094 this.params = params; 095 } 096 097 protected BatchFileEntry(String key, boolean chunked) { 098 this.key = key; 099 params = new HashMap<>(); 100 params.put(Batch.CHUNKED_PARAM_NAME, String.valueOf(chunked)); 101 } 102 103 public String getKey() { 104 return key; 105 } 106 107 public Map<String, Serializable> getParams() { 108 return params; 109 } 110 111 public boolean isChunked() { 112 return Boolean.parseBoolean((String) params.get(Batch.CHUNKED_PARAM_NAME)); 113 } 114 115 public String getFileName() { 116 if (isChunked()) { 117 return (String) params.get("fileName"); 118 } else { 119 Blob blob = getBlob(); 120 if (blob == null) { 121 return null; 122 } else { 123 return blob.getFilename(); 124 } 125 } 126 } 127 128 public String getMimeType() { 129 if (isChunked()) { 130 return (String) params.get("mimeType"); 131 } else { 132 Blob blob = getBlob(); 133 if (blob == null) { 134 return null; 135 } else { 136 return blob.getMimeType(); 137 } 138 } 139 } 140 141 public long getFileSize() { 142 if (isChunked()) { 143 return Long.parseLong((String) params.get("fileSize")); 144 } else { 145 Blob blob = getBlob(); 146 if (blob == null) { 147 return -1; 148 } else { 149 return blob.getLength(); 150 } 151 } 152 } 153 154 public int getChunkCount() { 155 if (!isChunked()) { 156 throw new NuxeoException(String.format("Cannot get chunk count of file entry %s as it is not chunked", key)); 157 } 158 return Integer.parseInt((String) params.get("chunkCount")); 159 } 160 161 public Map<Integer, String> getChunks() { 162 if (!isChunked()) { 163 throw new NuxeoException(String.format("Cannot get chunks of file entry %s as it is not chunked", key)); 164 } 165 Map<Integer, String> chunks = new HashMap<>(); 166 for (String param : params.keySet()) { 167 if (NumberUtils.isDigits(param)) { 168 chunks.put(Integer.parseInt(param), (String) params.get(param)); 169 } 170 } 171 return chunks; 172 } 173 174 public List<Integer> getOrderedChunkIndexes() { 175 if (!isChunked()) { 176 throw new NuxeoException(String.format("Cannot get chunk indexes of file entry %s as it is not chunked", 177 key)); 178 } 179 List<Integer> sortedChunkIndexes = new ArrayList<Integer>(getChunks().keySet()); 180 Collections.sort(sortedChunkIndexes); 181 return sortedChunkIndexes; 182 } 183 184 public Collection<String> getChunkEntryKeys() { 185 if (!isChunked()) { 186 throw new NuxeoException(String.format("Cannot get chunk entry keys of file entry %s as it is not chunked", 187 key)); 188 } 189 return getChunks().values(); 190 } 191 192 public boolean isChunksCompleted() { 193 return getChunks().size() == getChunkCount(); 194 } 195 196 public Blob getBlob() { 197 if (isChunked()) { 198 // First check if blob chunks have already been read and concatenated 199 if (chunkedBlob != null) { 200 return chunkedBlob; 201 } 202 File tmpChunkedFile = null; 203 try { 204 Map<Integer, String> chunks = getChunks(); 205 int uploadedChunkCount = chunks.size(); 206 int chunkCount = getChunkCount(); 207 if (uploadedChunkCount != chunkCount) { 208 log.warn(String.format( 209 "Cannot get blob for file entry %s as there are only %d uploaded chunks out of %d.", key, 210 uploadedChunkCount, chunkCount)); 211 return null; 212 } 213 chunkedBlob = Blobs.createBlobWithExtension(null); 214 // Temporary file made from concatenated chunks 215 tmpChunkedFile = chunkedBlob.getFile(); 216 BatchManager bm = Framework.getService(BatchManager.class); 217 TransientStore ts = bm.getTransientStore(); 218 // Sort chunk indexes and concatenate them to build the entire blob 219 List<Integer> sortedChunkIndexes = getOrderedChunkIndexes(); 220 for (int index : sortedChunkIndexes) { 221 Blob chunk = getChunk(ts, chunks.get(index)); 222 if (chunk != null) { 223 transferTo(chunk, tmpChunkedFile); 224 } 225 } 226 // Store tmpChunkedFile as a parameter for later deletion 227 ts.putParameter(key, "tmpChunkedFilePath", tmpChunkedFile.getAbsolutePath()); 228 chunkedBlob.setMimeType(getMimeType()); 229 chunkedBlob.setFilename(getFileName()); 230 return chunkedBlob; 231 } catch (IOException ioe) { 232 if (tmpChunkedFile != null && tmpChunkedFile.exists()) { 233 tmpChunkedFile.delete(); 234 } 235 chunkedBlob = null; 236 throw new NuxeoException(ioe); 237 } 238 } else { 239 return blob; 240 } 241 } 242 243 protected Blob getChunk(TransientStore ts, String key) { 244 List<Blob> blobs = ts.getBlobs(key); 245 if (CollectionUtils.isEmpty(blobs)) { 246 return null; 247 } 248 return blobs.get(0); 249 } 250 251 /** 252 * Appends the given blob to the given file. 253 */ 254 protected void transferTo(Blob blob, File file) throws IOException { 255 try (OutputStream out = new FileOutputStream(file, true)) { 256 try (InputStream in = blob.getStream()) { 257 IOUtils.copy(in, out); 258 } 259 } 260 } 261 262 public String addChunk(int index, Blob blob) { 263 if (!isChunked()) { 264 throw new NuxeoException("Cannot add a chunk to a non chunked file entry."); 265 } 266 int chunkCount = getChunkCount(); 267 if (index < 0) { 268 throw new NuxeoException(String.format("Cannot add chunk with negative index %d.", index)); 269 } 270 if (index >= chunkCount) { 271 throw new NuxeoException(String.format( 272 "Cannot add chunk with index %d to file entry %s as chunk count is %d.", index, key, chunkCount)); 273 } 274 if (getChunks().containsKey(index)) { 275 throw new NuxeoException(String.format( 276 "Cannot add chunk with index %d to file entry %s as it already exists.", index, key)); 277 } 278 279 String chunkEntryKey = key + "_" + index; 280 BatchManager bm = Framework.getService(BatchManager.class); 281 TransientStore ts = bm.getTransientStore(); 282 ts.putBlobs(chunkEntryKey, Collections.singletonList(blob)); 283 ts.putParameter(key, String.valueOf(index), chunkEntryKey); 284 285 return chunkEntryKey; 286 } 287 288 public void beforeRemove() { 289 BatchManager bm = Framework.getService(BatchManager.class); 290 String tmpChunkedFilePath = (String) bm.getTransientStore().getParameter(key, "tmpChunkedFilePath"); 291 if (tmpChunkedFilePath != null) { 292 File tmpChunkedFile = new File(tmpChunkedFilePath); 293 if (tmpChunkedFile.exists()) { 294 log.debug(String.format("Deleting temporary chunked file %s", tmpChunkedFilePath)); 295 tmpChunkedFile.delete(); 296 } 297 } 298 } 299}