001/*
002 * (C) Copyright 2019 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.blob;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.nio.file.Path;
024import java.util.HashMap;
025import java.util.Map;
026import java.util.Map.Entry;
027import java.util.concurrent.ConcurrentHashMap;
028
029import javax.transaction.Status;
030import javax.transaction.Synchronization;
031import javax.transaction.SystemException;
032import javax.transaction.Transaction;
033
034import org.apache.logging.log4j.LogManager;
035import org.apache.logging.log4j.Logger;
036import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
037import org.nuxeo.ecm.core.api.NuxeoException;
038import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
039import org.nuxeo.runtime.jtajca.NuxeoContainer;
040import org.nuxeo.runtime.transaction.TransactionHelper;
041
042/**
043 * Transactional Blob Store.
044 * <p>
045 * Until the transaction is committed, blobs are stored in a transient store. Upon commit, they are sent to the
046 * permanent store.
047 * <p>
048 * It is important that a copy operation between the transient store and the permanent store be extremely fast and never fail, as it
049 * will be done during commit.
050 *
051 * @since 11.1
052 */
053public class TransactionalBlobStore extends AbstractBlobStore implements Synchronization {
054
055    private static final Logger log = LogManager.getLogger(TransactionalBlobStore.class);
056
057    public final BlobStore store;
058
059    // may be the same as the permanent store if it has versioning
060    public final BlobStore transientStore;
061
062    /**
063     * Transient data recording operations applied to a blob, to be executed on the permanent store at commit time.
064     */
065    public static class TransientInfo {
066
067        /** The key in the transient store of the blob to use, or a delete marker. */
068        public String transientKey;
069
070        /** The update to apply. */
071        public BlobUpdateContext blobUpdateContext;
072    }
073
074    protected final ThreadLocal<Map<String, TransientInfo>> transientInfo = new ThreadLocal<>();
075
076    // the keys that have been modified in any active transaction
077    protected final Map<String, Transaction> keysInActiveTransactions = new ConcurrentHashMap<>();
078
079    protected static final String DELETE_MARKER = "";
080
081    protected static boolean isDeleteMarker(String transientKey) {
082        return DELETE_MARKER.equals(transientKey);
083    }
084
085    public TransactionalBlobStore(BlobStore store, BlobStore transientStore) {
086        super("tx", store.getKeyStrategy());
087        this.store = store;
088        this.transientStore = transientStore;
089        if (store.hasVersioning() && transientStore != store) {
090            throw new NuxeoException("If the store has versioning then it must be also the transient store");
091        }
092    }
093
094    @Override
095    public BinaryGarbageCollector getBinaryGarbageCollector() {
096        return store.getBinaryGarbageCollector();
097    }
098
099    @Override
100    public boolean hasVersioning() {
101        return store.hasVersioning();
102    }
103
104    @Override
105    public BlobStore unwrap() {
106        return store.unwrap();
107    }
108
109    @Override
110    public String writeBlob(BlobWriteContext blobWriteContext)
111            throws IOException {
112        if (TransactionHelper.isTransactionActive()) {
113            String transientKey;
114            String key;
115            logTrace("group tx write");
116            if (hasVersioning()) {
117                // with versioning there can be no collisions, and transientStore = store
118                transientKey = transientStore.writeBlob(blobWriteContext);
119                key = transientKey;
120            } else {
121                // for the transient write we use a random key
122                transientKey = transientStore.writeBlob(blobWriteContext.copyWithKey(randomString()));
123                key = blobWriteContext.getKey(); // may depend on write observer, for example for digests
124            }
125            try {
126                putTransientKey(key, transientKey);
127            } catch (ConcurrentUpdateException e) {
128                // delete transient store file
129                transientStore.deleteBlob(transientKey);
130                throw e;
131            }
132            logTrace("rnote over Nuxeo: " + key);
133            logTrace("end");
134            return key;
135        } else {
136            return store.writeBlob(blobWriteContext);
137        }
138    }
139
140    @Override
141    public boolean copyBlob(String key, BlobStore sourceStore, String sourceKey, boolean atomicMove)
142            throws IOException {
143        // copyBlob only called from commit or during caching
144        throw new UnsupportedOperationException();
145    }
146
147    @Override
148    public OptionalOrUnknown<Path> getFile(String key) {
149        if (TransactionHelper.isTransactionActive()) {
150            String transientKey = getTransientKey(key);
151            if (isDeleteMarker(transientKey)) {
152                return OptionalOrUnknown.missing();
153            } else if (transientKey != null) {
154                return transientStore.getFile(transientKey);
155            }
156            // else fall through
157        }
158        // check permanent store
159        return store.getFile(key);
160    }
161
162    @Override
163    public OptionalOrUnknown<InputStream>getStream(String key) throws IOException {
164        if (TransactionHelper.isTransactionActive()) {
165            String transientKey = getTransientKey(key);
166            if (isDeleteMarker(transientKey)) {
167                return OptionalOrUnknown.missing();
168            } else if (transientKey != null) {
169                OptionalOrUnknown<InputStream> streamOpt = transientStore.getStream(transientKey);
170                if (!streamOpt.isPresent()) {
171                    log.error("Missing blob from transient blob store: " + transientKey);
172                }
173                return streamOpt;
174            }
175            // else fall through
176        }
177        // check permanent store
178        return store.getStream(key);
179    }
180
181    @Override
182    public boolean readBlob(String key, Path file) throws IOException {
183        if (TransactionHelper.isTransactionActive()) {
184            logTrace("group tx read");
185            logTrace("rnote over Nuxeo: " + key);
186            String transientKey = getTransientKey(key);
187            boolean found;
188            if (isDeleteMarker(transientKey)) {
189                logTrace("<--", "deleted");
190                found = false; // deleted in transaction
191            } else if (transientKey != null) {
192                found = transientStore.readBlob(transientKey, file);
193                if (!found) {
194                    log.error("Missing blob from transient blob store: " + transientKey);
195                }
196            } else {
197                // else check permanent store
198                found = store.readBlob(key, file);
199            }
200            logTrace("end");
201            return found;
202        }
203        // check permanent store
204        return store.readBlob(key, file);
205    }
206
207    @Override
208    public void writeBlobProperties(BlobUpdateContext blobUpdateContext) throws IOException {
209        if (TransactionHelper.isTransactionActive()) {
210            String key = blobUpdateContext.key;
211            putTransientUpdate(key, blobUpdateContext);
212        } else {
213            store.writeBlobProperties(blobUpdateContext);
214        }
215    }
216
217    @Override
218    public void deleteBlob(String key) {
219        if (TransactionHelper.isTransactionActive()) {
220            putTransientKey(key, DELETE_MARKER);
221        } else {
222            store.deleteBlob(key);
223        }
224    }
225
226    protected Transaction getTransaction() {
227        try {
228            return NuxeoContainer.getTransactionManager().getTransaction();
229        } catch (NullPointerException | SystemException e) {
230            throw new NuxeoException(e);
231        }
232    }
233
234    // ---------- Synchronization ----------
235
236    protected String getTransientKey(String key) {
237        Map<String, TransientInfo> map = transientInfo.get();
238        if (map == null) {
239            return null;
240        } else {
241            TransientInfo info = map.get(key);
242            return info == null ? null : info.transientKey;
243        }
244    }
245
246    protected void putTransientKey(String key, String transientKey) {
247        // check concurrent update
248        Transaction tx = getTransaction();
249        Transaction otherTx = keysInActiveTransactions.putIfAbsent(key, tx);
250        if (otherTx != null) {
251            if (otherTx != tx) {
252                throw new ConcurrentUpdateException(key);
253            }
254            // there may be a previous transient file
255            // it's now unneeded as we're about to overwrite it
256            String otherTransientKey = getTransientKey(key);
257            if (otherTransientKey != null && !isDeleteMarker(otherTransientKey)) {
258                transientStore.deleteBlob(otherTransientKey);
259            }
260        }
261        // put transient key
262        TransientInfo info = getTransientInfo(key);
263        info.transientKey = transientKey;
264    }
265
266    protected void putTransientUpdate(String key, BlobUpdateContext blobUpdateContext) {
267        TransientInfo info = getTransientInfo(key);
268        if (info.blobUpdateContext == null) {
269            info.blobUpdateContext = blobUpdateContext;
270        } else {
271            info.blobUpdateContext.with(blobUpdateContext);
272        }
273    }
274
275    protected TransientInfo getTransientInfo(String key) {
276        Map<String, TransientInfo> map = transientInfo.get();
277        if (map == null) {
278            map = new HashMap<>();
279            transientInfo.set(map);
280            TransactionHelper.registerSynchronization(this);
281        }
282        return map.computeIfAbsent(key, k -> new TransientInfo());
283    }
284
285    @Override
286    public void beforeCompletion() {
287        // nothing to do
288    }
289
290    @Override
291    public void afterCompletion(int status) {
292        Map<String, TransientInfo> map = transientInfo.get();
293        transientInfo.remove();
294        try {
295            if (status == Status.STATUS_COMMITTED) {
296                logTrace("== TX commit ==");
297                // move transient files to permanent store
298                for (Entry<String, TransientInfo> en : map.entrySet()) {
299                    String key = en.getKey();
300                    TransientInfo info = en.getValue();
301                    // apply create/delete
302                    String transientKey = info.transientKey;
303                    if (transientKey != null) {
304                        if (isDeleteMarker(transientKey)) {
305                            store.deleteBlob(key);
306                        } else {
307                            // with versioning, the blob already has its final key
308                            // without versioning, atomically move to permanent store
309                            if (!hasVersioning()) {
310                                try {
311                                    boolean found = store.copyBlob(key, transientStore, transientKey, true);
312                                    if (!found) {
313                                        log.error("Missing blob from transient blob store: " + transientKey
314                                                + ", failed to commit creation of file: " + key);
315                                        continue;
316                                    }
317                                } catch (IOException e) {
318                                    log.error("Failed to commit creation of blob: " + key, e);
319                                    continue;
320                                }
321                            }
322                        }
323                    }
324                    // apply updates
325                    BlobUpdateContext blobUpdateContext = info.blobUpdateContext;
326                    if (blobUpdateContext != null) {
327                        try {
328                            store.writeBlobProperties(blobUpdateContext);
329                        } catch (IOException e) {
330                            log.error("Failed to commit update of blob: " + key, e);
331                        }
332                    }
333                }
334            } else if (status == Status.STATUS_ROLLEDBACK) {
335                logTrace("== TX rollback ==");
336                // delete transient files
337                for (TransientInfo info : map.values()) {
338                    String transientKey = info.transientKey;
339                    if (transientKey != null && !isDeleteMarker(transientKey)) {
340                        transientStore.deleteBlob(transientKey);
341                    }
342                }
343            } else {
344                log.error("Unexpected afterCompletion status: " + status);
345            }
346        } finally {
347            logTrace("== TX end ==");
348            keysInActiveTransactions.keySet().removeAll(map.keySet());
349        }
350    }
351
352}