001/* 002 * (C) Copyright 2015-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Antoine Taillefer <ataillefer@nuxeo.com> 018 */ 019package org.nuxeo.ecm.automation.server.jaxrs.batch; 020 021import java.io.File; 022import java.io.FileOutputStream; 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.OutputStream; 026import java.io.Serializable; 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.List; 032import java.util.Map; 033 034import org.apache.commons.collections.CollectionUtils; 035import org.apache.commons.io.IOUtils; 036import org.apache.commons.lang3.StringUtils; 037import org.apache.commons.lang3.math.NumberUtils; 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040import org.nuxeo.ecm.core.api.Blob; 041import org.nuxeo.ecm.core.api.Blobs; 042import org.nuxeo.ecm.core.api.NuxeoException; 043import org.nuxeo.ecm.core.transientstore.api.TransientStore; 044import org.nuxeo.runtime.api.Framework; 045 046/** 047 * Represents a batch file backed by the {@link TransientStore}. 048 * <p> 049 * The file can be chunked or not. If it is chunked it references its chunks as {@link TransientStore} entry keys. 050 * 051 * @since 7.4 052 * @see Batch 053 */ 054public class BatchFileEntry { 055 056 protected static final Log log = LogFactory.getLog(BatchFileEntry.class); 057 058 protected String key; 059 060 protected Map<String, Serializable> params; 061 062 protected Blob blob; 063 064 protected Blob chunkedBlob; 065 066 /** 067 * Returns a file entry that holds the given blob, not chunked. 068 */ 069 public BatchFileEntry(String key, Blob blob) { 070 this(key, false); 071 this.blob = blob; 072 } 073 074 /** 075 * Returns a file entry that references the file chunks. 076 */ 077 public BatchFileEntry(String key, int chunkCount, String fileName, String mimeType, long fileSize) { 078 this(key, true); 079 params.put("chunkCount", String.valueOf(chunkCount)); 080 if (!StringUtils.isEmpty(fileName)) { 081 params.put("fileName", fileName); 082 } 083 if (!StringUtils.isEmpty(mimeType)) { 084 params.put("mimeType", mimeType); 085 } 086 params.put("fileSize", String.valueOf(fileSize)); 087 } 088 089 /** 090 * Returns a file entry that holds the given parameters. 091 */ 092 public BatchFileEntry(String key, Map<String, Serializable> params) { 093 this.key = key; 094 this.params = params; 095 } 096 097 protected BatchFileEntry(String key, boolean chunked) { 098 this.key = key; 099 params = new HashMap<>(); 100 params.put(Batch.CHUNKED_PARAM_NAME, String.valueOf(chunked)); 101 } 102 103 public String getKey() { 104 return key; 105 } 106 107 public Map<String, Serializable> getParams() { 108 return params; 109 } 110 111 public boolean isChunked() { 112 return Boolean.parseBoolean((String) params.get(Batch.CHUNKED_PARAM_NAME)); 113 } 114 115 public String getFileName() { 116 if (isChunked()) { 117 return (String) params.get("fileName"); 118 } else { 119 Blob blob = getBlob(); 120 if (blob == null) { 121 return null; 122 } else { 123 return blob.getFilename(); 124 } 125 } 126 } 127 128 public String getMimeType() { 129 if (isChunked()) { 130 return (String) params.get("mimeType"); 131 } else { 132 Blob blob = getBlob(); 133 if (blob == null) { 134 return null; 135 } else { 136 return blob.getMimeType(); 137 } 138 } 139 } 140 141 public long getFileSize() { 142 if (isChunked()) { 143 return Long.parseLong((String) params.get("fileSize")); 144 } else { 145 Blob blob = getBlob(); 146 if (blob == null) { 147 return -1; 148 } else { 149 return blob.getLength(); 150 } 151 } 152 } 153 154 public int getChunkCount() { 155 if (!isChunked()) { 156 throw new NuxeoException( 157 String.format("Cannot get chunk count of file entry %s as it is not chunked", key)); 158 } 159 return Integer.parseInt((String) params.get("chunkCount")); 160 } 161 162 public Map<Integer, String> getChunks() { 163 if (!isChunked()) { 164 throw new NuxeoException(String.format("Cannot get chunks of file entry %s as it is not chunked", key)); 165 } 166 Map<Integer, String> chunks = new HashMap<>(); 167 for (String param : params.keySet()) { 168 if (NumberUtils.isDigits(param)) { 169 chunks.put(Integer.valueOf(param), (String) params.get(param)); 170 } 171 } 172 return chunks; 173 } 174 175 public List<Integer> getOrderedChunkIndexes() { 176 if (!isChunked()) { 177 throw new NuxeoException( 178 String.format("Cannot get chunk indexes of file entry %s as it is not chunked", key)); 179 } 180 List<Integer> sortedChunkIndexes = new ArrayList<>(getChunks().keySet()); 181 Collections.sort(sortedChunkIndexes); 182 return sortedChunkIndexes; 183 } 184 185 public Collection<String> getChunkEntryKeys() { 186 if (!isChunked()) { 187 throw new NuxeoException( 188 String.format("Cannot get chunk entry keys of file entry %s as it is not chunked", key)); 189 } 190 return getChunks().values(); 191 } 192 193 public boolean isChunksCompleted() { 194 return getChunks().size() == getChunkCount(); 195 } 196 197 public Blob getBlob() { 198 if (isChunked()) { 199 // First check if blob chunks have already been read and concatenated 200 if (chunkedBlob != null) { 201 return chunkedBlob; 202 } 203 File tmpChunkedFile = null; 204 try { 205 Map<Integer, String> chunks = getChunks(); 206 int uploadedChunkCount = chunks.size(); 207 int chunkCount = getChunkCount(); 208 if (uploadedChunkCount != chunkCount) { 209 log.warn(String.format( 210 "Cannot get blob for file entry %s as there are only %d uploaded chunks out of %d.", key, 211 uploadedChunkCount, chunkCount)); 212 return null; 213 } 214 chunkedBlob = Blobs.createBlobWithExtension(null); 215 // Temporary file made from concatenated chunks 216 tmpChunkedFile = chunkedBlob.getFile(); 217 BatchManager bm = Framework.getService(BatchManager.class); 218 TransientStore ts = bm.getTransientStore(); 219 // Sort chunk indexes and concatenate them to build the entire blob 220 List<Integer> sortedChunkIndexes = getOrderedChunkIndexes(); 221 for (int index : sortedChunkIndexes) { 222 Blob chunk = getChunk(ts, chunks.get(index)); 223 if (chunk != null) { 224 transferTo(chunk, tmpChunkedFile); 225 } 226 } 227 // Store tmpChunkedFile as a parameter for later deletion 228 ts.putParameter(key, "tmpChunkedFilePath", tmpChunkedFile.getAbsolutePath()); 229 chunkedBlob.setMimeType(getMimeType()); 230 chunkedBlob.setFilename(getFileName()); 231 return chunkedBlob; 232 } catch (IOException ioe) { 233 if (tmpChunkedFile != null && tmpChunkedFile.exists()) { 234 tmpChunkedFile.delete(); 235 } 236 chunkedBlob = null; 237 throw new NuxeoException(ioe); 238 } 239 } else { 240 return blob; 241 } 242 } 243 244 protected Blob getChunk(TransientStore ts, String key) { 245 List<Blob> blobs = ts.getBlobs(key); 246 if (CollectionUtils.isEmpty(blobs)) { 247 return null; 248 } 249 return blobs.get(0); 250 } 251 252 /** 253 * Appends the given blob to the given file. 254 */ 255 protected void transferTo(Blob blob, File file) throws IOException { 256 try (OutputStream out = new FileOutputStream(file, true)) { 257 try (InputStream in = blob.getStream()) { 258 IOUtils.copy(in, out); 259 } 260 } 261 } 262 263 public String addChunk(int index, Blob blob) { 264 if (!isChunked()) { 265 throw new NuxeoException("Cannot add a chunk to a non chunked file entry."); 266 } 267 int chunkCount = getChunkCount(); 268 if (index < 0) { 269 throw new NuxeoException(String.format("Cannot add chunk with negative index %d.", index)); 270 } 271 if (index >= chunkCount) { 272 throw new NuxeoException(String.format( 273 "Cannot add chunk with index %d to file entry %s as chunk count is %d.", index, key, chunkCount)); 274 } 275 if (getChunks().containsKey(index)) { 276 throw new NuxeoException( 277 String.format("Cannot add chunk with index %d to file entry %s as it already exists.", index, key)); 278 } 279 280 String chunkEntryKey = key + "_" + index; 281 BatchManager bm = Framework.getService(BatchManager.class); 282 TransientStore ts = bm.getTransientStore(); 283 ts.putBlobs(chunkEntryKey, Collections.singletonList(blob)); 284 ts.putParameter(key, String.valueOf(index), chunkEntryKey); 285 286 return chunkEntryKey; 287 } 288 289 public void beforeRemove() { 290 BatchManager bm = Framework.getService(BatchManager.class); 291 String tmpChunkedFilePath = (String) bm.getTransientStore().getParameter(key, "tmpChunkedFilePath"); 292 if (tmpChunkedFilePath != null) { 293 File tmpChunkedFile = new File(tmpChunkedFilePath); 294 if (tmpChunkedFile.exists()) { 295 log.debug(String.format("Deleting temporary chunked file %s", tmpChunkedFilePath)); 296 tmpChunkedFile.delete(); 297 } 298 } 299 } 300}