001/* 002 * (C) Copyright 2019 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.blob; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.nio.file.Path; 024import java.util.HashMap; 025import java.util.Map; 026import java.util.Map.Entry; 027import java.util.concurrent.ConcurrentHashMap; 028 029import javax.transaction.Status; 030import javax.transaction.Synchronization; 031import javax.transaction.SystemException; 032import javax.transaction.Transaction; 033 034import org.apache.logging.log4j.LogManager; 035import org.apache.logging.log4j.Logger; 036import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 037import org.nuxeo.ecm.core.api.NuxeoException; 038import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 039import org.nuxeo.runtime.jtajca.NuxeoContainer; 040import org.nuxeo.runtime.transaction.TransactionHelper; 041 042/** 043 * Transactional Blob Store. 044 * <p> 045 * Until the transaction is committed, blobs are stored in a transient store. Upon commit, they are sent to the 046 * permanent store. 047 * <p> 048 * It is important that a copy operation between the transient store and the permanent store be extremely fast and never fail, as it 049 * will be done during commit. 050 * 051 * @since 11.1 052 */ 053public class TransactionalBlobStore extends AbstractBlobStore implements Synchronization { 054 055 private static final Logger log = LogManager.getLogger(TransactionalBlobStore.class); 056 057 public final BlobStore store; 058 059 // may be the same as the permanent store if it has versioning 060 public final BlobStore transientStore; 061 062 /** 063 * Transient data recording operations applied to a blob, to be executed on the permanent store at commit time. 064 */ 065 public static class TransientInfo { 066 067 /** The key in the transient store of the blob to use, or a delete marker. */ 068 public String transientKey; 069 070 /** The update to apply. */ 071 public BlobUpdateContext blobUpdateContext; 072 } 073 074 protected final ThreadLocal<Map<String, TransientInfo>> transientInfo = new ThreadLocal<>(); 075 076 // the keys that have been modified in any active transaction 077 protected final Map<String, Transaction> keysInActiveTransactions = new ConcurrentHashMap<>(); 078 079 protected static final String DELETE_MARKER = ""; 080 081 protected static boolean isDeleteMarker(String transientKey) { 082 return DELETE_MARKER.equals(transientKey); 083 } 084 085 public TransactionalBlobStore(BlobStore store, BlobStore transientStore) { 086 super("tx", store.getKeyStrategy()); 087 this.store = store; 088 this.transientStore = transientStore; 089 if (store.hasVersioning() && transientStore != store) { 090 throw new NuxeoException("If the store has versioning then it must be also the transient store"); 091 } 092 } 093 094 @Override 095 public BinaryGarbageCollector getBinaryGarbageCollector() { 096 return store.getBinaryGarbageCollector(); 097 } 098 099 @Override 100 public boolean hasVersioning() { 101 return store.hasVersioning(); 102 } 103 104 @Override 105 public BlobStore unwrap() { 106 return store.unwrap(); 107 } 108 109 @Override 110 public String writeBlob(BlobWriteContext blobWriteContext) 111 throws IOException { 112 if (TransactionHelper.isTransactionActive()) { 113 String transientKey; 114 String key; 115 logTrace("group tx write"); 116 if (hasVersioning()) { 117 // with versioning there can be no collisions, and transientStore = store 118 transientKey = transientStore.writeBlob(blobWriteContext); 119 key = transientKey; 120 } else { 121 // for the transient write we use a random key 122 transientKey = transientStore.writeBlob(blobWriteContext.copyWithKey(randomString())); 123 key = blobWriteContext.getKey(); // may depend on write observer, for example for digests 124 } 125 try { 126 putTransientKey(key, transientKey); 127 } catch (ConcurrentUpdateException e) { 128 // delete transient store file 129 transientStore.deleteBlob(transientKey); 130 throw e; 131 } 132 logTrace("rnote over Nuxeo: " + key); 133 logTrace("end"); 134 return key; 135 } else { 136 return store.writeBlob(blobWriteContext); 137 } 138 } 139 140 @Override 141 public boolean copyBlob(String key, BlobStore sourceStore, String sourceKey, boolean atomicMove) 142 throws IOException { 143 // copyBlob only called from commit or during caching 144 throw new UnsupportedOperationException(); 145 } 146 147 @Override 148 public OptionalOrUnknown<Path> getFile(String key) { 149 if (TransactionHelper.isTransactionActive()) { 150 String transientKey = getTransientKey(key); 151 if (isDeleteMarker(transientKey)) { 152 return OptionalOrUnknown.missing(); 153 } else if (transientKey != null) { 154 return transientStore.getFile(transientKey); 155 } 156 // else fall through 157 } 158 // check permanent store 159 return store.getFile(key); 160 } 161 162 @Override 163 public OptionalOrUnknown<InputStream>getStream(String key) throws IOException { 164 if (TransactionHelper.isTransactionActive()) { 165 String transientKey = getTransientKey(key); 166 if (isDeleteMarker(transientKey)) { 167 return OptionalOrUnknown.missing(); 168 } else if (transientKey != null) { 169 OptionalOrUnknown<InputStream> streamOpt = transientStore.getStream(transientKey); 170 if (!streamOpt.isPresent()) { 171 log.error("Missing blob from transient blob store: " + transientKey); 172 } 173 return streamOpt; 174 } 175 // else fall through 176 } 177 // check permanent store 178 return store.getStream(key); 179 } 180 181 @Override 182 public boolean readBlob(String key, Path file) throws IOException { 183 if (TransactionHelper.isTransactionActive()) { 184 logTrace("group tx read"); 185 logTrace("rnote over Nuxeo: " + key); 186 String transientKey = getTransientKey(key); 187 boolean found; 188 if (isDeleteMarker(transientKey)) { 189 logTrace("<--", "deleted"); 190 found = false; // deleted in transaction 191 } else if (transientKey != null) { 192 found = transientStore.readBlob(transientKey, file); 193 if (!found) { 194 log.error("Missing blob from transient blob store: " + transientKey); 195 } 196 } else { 197 // else check permanent store 198 found = store.readBlob(key, file); 199 } 200 logTrace("end"); 201 return found; 202 } 203 // check permanent store 204 return store.readBlob(key, file); 205 } 206 207 @Override 208 public void writeBlobProperties(BlobUpdateContext blobUpdateContext) throws IOException { 209 if (TransactionHelper.isTransactionActive()) { 210 String key = blobUpdateContext.key; 211 putTransientUpdate(key, blobUpdateContext); 212 } else { 213 store.writeBlobProperties(blobUpdateContext); 214 } 215 } 216 217 @Override 218 public void deleteBlob(String key) { 219 if (TransactionHelper.isTransactionActive()) { 220 putTransientKey(key, DELETE_MARKER); 221 } else { 222 store.deleteBlob(key); 223 } 224 } 225 226 protected Transaction getTransaction() { 227 try { 228 return NuxeoContainer.getTransactionManager().getTransaction(); 229 } catch (NullPointerException | SystemException e) { 230 throw new NuxeoException(e); 231 } 232 } 233 234 // ---------- Synchronization ---------- 235 236 protected String getTransientKey(String key) { 237 Map<String, TransientInfo> map = transientInfo.get(); 238 if (map == null) { 239 return null; 240 } else { 241 TransientInfo info = map.get(key); 242 return info == null ? null : info.transientKey; 243 } 244 } 245 246 protected void putTransientKey(String key, String transientKey) { 247 // check concurrent update 248 Transaction tx = getTransaction(); 249 Transaction otherTx = keysInActiveTransactions.putIfAbsent(key, tx); 250 if (otherTx != null) { 251 if (otherTx != tx) { 252 throw new ConcurrentUpdateException(key); 253 } 254 // there may be a previous transient file 255 // it's now unneeded as we're about to overwrite it 256 String otherTransientKey = getTransientKey(key); 257 if (otherTransientKey != null && !isDeleteMarker(otherTransientKey)) { 258 transientStore.deleteBlob(otherTransientKey); 259 } 260 } 261 // put transient key 262 TransientInfo info = getTransientInfo(key); 263 info.transientKey = transientKey; 264 } 265 266 protected void putTransientUpdate(String key, BlobUpdateContext blobUpdateContext) { 267 TransientInfo info = getTransientInfo(key); 268 if (info.blobUpdateContext == null) { 269 info.blobUpdateContext = blobUpdateContext; 270 } else { 271 info.blobUpdateContext.with(blobUpdateContext); 272 } 273 } 274 275 protected TransientInfo getTransientInfo(String key) { 276 Map<String, TransientInfo> map = transientInfo.get(); 277 if (map == null) { 278 map = new HashMap<>(); 279 transientInfo.set(map); 280 TransactionHelper.registerSynchronization(this); 281 } 282 return map.computeIfAbsent(key, k -> new TransientInfo()); 283 } 284 285 @Override 286 public void beforeCompletion() { 287 // nothing to do 288 } 289 290 @Override 291 public void afterCompletion(int status) { 292 Map<String, TransientInfo> map = transientInfo.get(); 293 transientInfo.remove(); 294 try { 295 if (status == Status.STATUS_COMMITTED) { 296 logTrace("== TX commit =="); 297 // move transient files to permanent store 298 for (Entry<String, TransientInfo> en : map.entrySet()) { 299 String key = en.getKey(); 300 TransientInfo info = en.getValue(); 301 // apply create/delete 302 String transientKey = info.transientKey; 303 if (transientKey != null) { 304 if (isDeleteMarker(transientKey)) { 305 store.deleteBlob(key); 306 } else { 307 // with versioning, the blob already has its final key 308 // without versioning, atomically move to permanent store 309 if (!hasVersioning()) { 310 try { 311 boolean found = store.copyBlob(key, transientStore, transientKey, true); 312 if (!found) { 313 log.error("Missing blob from transient blob store: " + transientKey 314 + ", failed to commit creation of file: " + key); 315 continue; 316 } 317 } catch (IOException e) { 318 log.error("Failed to commit creation of blob: " + key, e); 319 continue; 320 } 321 } 322 } 323 } 324 // apply updates 325 BlobUpdateContext blobUpdateContext = info.blobUpdateContext; 326 if (blobUpdateContext != null) { 327 try { 328 store.writeBlobProperties(blobUpdateContext); 329 } catch (IOException e) { 330 log.error("Failed to commit update of blob: " + key, e); 331 } 332 } 333 } 334 } else if (status == Status.STATUS_ROLLEDBACK) { 335 logTrace("== TX rollback =="); 336 // delete transient files 337 for (TransientInfo info : map.values()) { 338 String transientKey = info.transientKey; 339 if (transientKey != null && !isDeleteMarker(transientKey)) { 340 transientStore.deleteBlob(transientKey); 341 } 342 } 343 } else { 344 log.error("Unexpected afterCompletion status: " + status); 345 } 346 } finally { 347 logTrace("== TX end =="); 348 keysInActiveTransactions.keySet().removeAll(map.keySet()); 349 } 350 } 351 352}