001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Antoine Taillefer <ataillefer@nuxeo.com> 018 */ 019package org.nuxeo.ecm.automation.server.jaxrs.batch; 020 021import java.io.File; 022import java.io.FileOutputStream; 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.OutputStream; 026import java.io.Serializable; 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.List; 032import java.util.Map; 033 034import org.apache.commons.collections.CollectionUtils; 035import org.apache.commons.io.IOUtils; 036import org.apache.commons.lang.StringUtils; 037import org.apache.commons.lang.math.NumberUtils; 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040import org.nuxeo.ecm.core.api.Blob; 041import org.nuxeo.ecm.core.api.Blobs; 042import org.nuxeo.ecm.core.api.NuxeoException; 043import org.nuxeo.ecm.core.transientstore.api.TransientStore; 044import org.nuxeo.runtime.api.Framework; 045 046/** 047 * Represents a batch file backed by the {@link TransientStore}. 048 * <p> 049 * The file can be chunked or not. If it is chunked it references its chunks as {@link TransientStore} entry keys. 050 * 051 * @since 7.4 052 * @see Batch 053 */ 054public class BatchFileEntry { 055 056 protected static final Log log = LogFactory.getLog(BatchFileEntry.class); 057 058 protected String key; 059 060 protected Map<String, Serializable> params; 061 062 protected Blob blob; 063 064 protected Blob chunkedBlob; 065 066 /** 067 * Returns a file entry that holds the given blob, not chunked. 068 */ 069 public BatchFileEntry(String key, Blob blob) { 070 this(key, false); 071 this.blob = blob; 072 } 073 074 /** 075 * Returns a file entry that references the file chunks. 076 * 077 * @see BatchChunkEntry 078 */ 079 public BatchFileEntry(String key, int chunkCount, String fileName, String mimeType, long fileSize) { 080 this(key, true); 081 params.put("chunkCount", String.valueOf(chunkCount)); 082 if (!StringUtils.isEmpty(fileName)) { 083 params.put("fileName", fileName); 084 } 085 if (!StringUtils.isEmpty(mimeType)) { 086 params.put("mimeType", mimeType); 087 } 088 params.put("fileSize", String.valueOf(fileSize)); 089 } 090 091 /** 092 * Returns a file entry that holds the given parameters. 093 */ 094 public BatchFileEntry(String key, Map<String, Serializable> params) { 095 this.key = key; 096 this.params = params; 097 } 098 099 protected BatchFileEntry(String key, boolean chunked) { 100 this.key = key; 101 params = new HashMap<>(); 102 params.put(Batch.CHUNKED_PARAM_NAME, String.valueOf(chunked)); 103 } 104 105 public String getKey() { 106 return key; 107 } 108 109 public Map<String, Serializable> getParams() { 110 return params; 111 } 112 113 public boolean isChunked() { 114 return Boolean.parseBoolean((String) params.get(Batch.CHUNKED_PARAM_NAME)); 115 } 116 117 public String getFileName() { 118 if (isChunked()) { 119 return (String) params.get("fileName"); 120 } else { 121 Blob blob = getBlob(); 122 if (blob == null) { 123 return null; 124 } else { 125 return blob.getFilename(); 126 } 127 } 128 } 129 130 public String getMimeType() { 131 if (isChunked()) { 132 return (String) params.get("mimeType"); 133 } else { 134 Blob blob = getBlob(); 135 if (blob == null) { 136 return null; 137 } else { 138 return blob.getMimeType(); 139 } 140 } 141 } 142 143 public long getFileSize() { 144 if (isChunked()) { 145 return Long.parseLong((String) params.get("fileSize")); 146 } else { 147 Blob blob = getBlob(); 148 if (blob == null) { 149 return -1; 150 } else { 151 return blob.getLength(); 152 } 153 } 154 } 155 156 public int getChunkCount() { 157 if (!isChunked()) { 158 throw new NuxeoException(String.format("Cannot get chunk count of file entry %s as it is not chunked", key)); 159 } 160 return Integer.parseInt((String) params.get("chunkCount")); 161 } 162 163 public Map<Integer, String> getChunks() { 164 if (!isChunked()) { 165 throw new NuxeoException(String.format("Cannot get chunks of file entry %s as it is not chunked", key)); 166 } 167 Map<Integer, String> chunks = new HashMap<>(); 168 for (String param : params.keySet()) { 169 if (NumberUtils.isDigits(param)) { 170 chunks.put(Integer.parseInt(param), (String) params.get(param)); 171 } 172 } 173 return chunks; 174 } 175 176 public List<Integer> getOrderedChunkIndexes() { 177 if (!isChunked()) { 178 throw new NuxeoException(String.format("Cannot get chunk indexes of file entry %s as it is not chunked", 179 key)); 180 } 181 List<Integer> sortedChunkIndexes = new ArrayList<Integer>(getChunks().keySet()); 182 Collections.sort(sortedChunkIndexes); 183 return sortedChunkIndexes; 184 } 185 186 public Collection<String> getChunkEntryKeys() { 187 if (!isChunked()) { 188 throw new NuxeoException(String.format("Cannot get chunk entry keys of file entry %s as it is not chunked", 189 key)); 190 } 191 return getChunks().values(); 192 } 193 194 public boolean isChunksCompleted() { 195 return getChunks().size() == getChunkCount(); 196 } 197 198 public Blob getBlob() { 199 if (isChunked()) { 200 // First check if blob chunks have already been read and concatenated 201 if (chunkedBlob != null) { 202 return chunkedBlob; 203 } 204 File tmpChunkedFile = null; 205 try { 206 Map<Integer, String> chunks = getChunks(); 207 int uploadedChunkCount = chunks.size(); 208 int chunkCount = getChunkCount(); 209 if (uploadedChunkCount != chunkCount) { 210 log.warn(String.format( 211 "Cannot get blob for file entry %s as there are only %d uploaded chunks out of %d.", key, 212 uploadedChunkCount, chunkCount)); 213 return null; 214 } 215 chunkedBlob = Blobs.createBlobWithExtension(null); 216 // Temporary file made from concatenated chunks 217 tmpChunkedFile = chunkedBlob.getFile(); 218 BatchManager bm = Framework.getService(BatchManager.class); 219 TransientStore ts = bm.getTransientStore(); 220 // Sort chunk indexes and concatenate them to build the entire blob 221 List<Integer> sortedChunkIndexes = getOrderedChunkIndexes(); 222 for (int index : sortedChunkIndexes) { 223 Blob chunk = getChunk(ts, chunks.get(index)); 224 if (chunk != null) { 225 transferTo(chunk, tmpChunkedFile); 226 } 227 } 228 // Store tmpChunkedFile as a parameter for later deletion 229 ts.putParameter(key, "tmpChunkedFilePath", tmpChunkedFile.getAbsolutePath()); 230 chunkedBlob.setMimeType(getMimeType()); 231 chunkedBlob.setFilename(getFileName()); 232 return chunkedBlob; 233 } catch (IOException ioe) { 234 if (tmpChunkedFile != null && tmpChunkedFile.exists()) { 235 tmpChunkedFile.delete(); 236 } 237 chunkedBlob = null; 238 throw new NuxeoException(ioe); 239 } 240 } else { 241 return blob; 242 } 243 } 244 245 protected Blob getChunk(TransientStore ts, String key) { 246 List<Blob> blobs = ts.getBlobs(key); 247 if (CollectionUtils.isEmpty(blobs)) { 248 return null; 249 } 250 return blobs.get(0); 251 } 252 253 /** 254 * Appends the given blob to the given file. 255 */ 256 protected void transferTo(Blob blob, File file) throws IOException { 257 try (OutputStream out = new FileOutputStream(file, true)) { 258 try (InputStream in = blob.getStream()) { 259 IOUtils.copy(in, out); 260 } 261 } 262 } 263 264 public String addChunk(int index, Blob blob) { 265 if (!isChunked()) { 266 throw new NuxeoException("Cannot add a chunk to a non chunked file entry."); 267 } 268 int chunkCount = getChunkCount(); 269 if (index < 0) { 270 throw new NuxeoException(String.format("Cannot add chunk with negative index %d.", index)); 271 } 272 if (index >= chunkCount) { 273 throw new NuxeoException(String.format( 274 "Cannot add chunk with index %d to file entry %s as chunk count is %d.", index, key, chunkCount)); 275 } 276 if (getChunks().containsKey(index)) { 277 throw new NuxeoException(String.format( 278 "Cannot add chunk with index %d to file entry %s as it already exists.", index, key)); 279 } 280 281 String chunkEntryKey = key + "_" + index; 282 BatchManager bm = Framework.getService(BatchManager.class); 283 TransientStore ts = bm.getTransientStore(); 284 ts.putBlobs(chunkEntryKey, Collections.singletonList(blob)); 285 ts.putParameter(key, String.valueOf(index), chunkEntryKey); 286 287 return chunkEntryKey; 288 } 289 290 public void beforeRemove() { 291 BatchManager bm = Framework.getService(BatchManager.class); 292 String tmpChunkedFilePath = (String) bm.getTransientStore().getParameter(key, "tmpChunkedFilePath"); 293 if (tmpChunkedFilePath != null) { 294 File tmpChunkedFile = new File(tmpChunkedFilePath); 295 if (tmpChunkedFile.exists()) { 296 log.debug(String.format("Deleting temporary chunked file %s", tmpChunkedFilePath)); 297 tmpChunkedFile.delete(); 298 } 299 } 300 } 301}