001/*
002 * (C) Copyright 2019 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.blob.s3;
020
021import static org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM;
022import static org.apache.commons.lang3.StringUtils.isNotBlank;
023import static org.nuxeo.ecm.core.blob.BlobProviderDescriptor.ALLOW_BYTE_RANGE;
024import static org.nuxeo.ecm.core.blob.KeyStrategy.VER_SEP;
025
026import java.io.File;
027import java.io.IOException;
028import java.io.InputStream;
029import java.nio.file.Files;
030import java.nio.file.Path;
031import java.time.Duration;
032import java.util.Calendar;
033import java.util.Collections;
034import java.util.Date;
035import java.util.HashSet;
036import java.util.Map;
037import java.util.Set;
038
039import org.apache.commons.codec.digest.DigestUtils;
040import org.apache.commons.lang3.mutable.MutableObject;
041import org.apache.logging.log4j.LogManager;
042import org.apache.logging.log4j.Logger;
043import org.nuxeo.common.utils.RFC2231;
044import org.nuxeo.ecm.core.api.Blob;
045import org.nuxeo.ecm.core.api.NuxeoException;
046import org.nuxeo.ecm.core.api.NuxeoPrincipal;
047import org.nuxeo.ecm.core.api.SystemPrincipal;
048import org.nuxeo.ecm.core.blob.AbstractBlobGarbageCollector;
049import org.nuxeo.ecm.core.blob.AbstractBlobStore;
050import org.nuxeo.ecm.core.blob.BlobContext;
051import org.nuxeo.ecm.core.blob.BlobManager;
052import org.nuxeo.ecm.core.blob.BlobProvider;
053import org.nuxeo.ecm.core.blob.BlobStore;
054import org.nuxeo.ecm.core.blob.BlobUpdateContext;
055import org.nuxeo.ecm.core.blob.BlobWriteContext;
056import org.nuxeo.ecm.core.blob.ByteRange;
057import org.nuxeo.ecm.core.blob.KeyStrategy;
058import org.nuxeo.ecm.core.blob.ManagedBlob;
059import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
060import org.nuxeo.ecm.core.io.download.DownloadHelper;
061import org.nuxeo.runtime.api.Framework;
062
063import com.amazonaws.AmazonServiceException;
064import com.amazonaws.SdkBaseException;
065import com.amazonaws.services.s3.AmazonS3;
066import com.amazonaws.services.s3.model.BucketVersioningConfiguration;
067import com.amazonaws.services.s3.model.CopyObjectRequest;
068import com.amazonaws.services.s3.model.EncryptedPutObjectRequest;
069import com.amazonaws.services.s3.model.GetObjectRequest;
070import com.amazonaws.services.s3.model.ListObjectsRequest;
071import com.amazonaws.services.s3.model.ListVersionsRequest;
072import com.amazonaws.services.s3.model.ObjectListing;
073import com.amazonaws.services.s3.model.ObjectLockLegalHold;
074import com.amazonaws.services.s3.model.ObjectLockLegalHoldStatus;
075import com.amazonaws.services.s3.model.ObjectLockRetention;
076import com.amazonaws.services.s3.model.ObjectMetadata;
077import com.amazonaws.services.s3.model.PutObjectRequest;
078import com.amazonaws.services.s3.model.RestoreObjectRequest;
079import com.amazonaws.services.s3.model.S3ObjectSummary;
080import com.amazonaws.services.s3.model.S3VersionSummary;
081import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
082import com.amazonaws.services.s3.model.SetObjectLegalHoldRequest;
083import com.amazonaws.services.s3.model.SetObjectRetentionRequest;
084import com.amazonaws.services.s3.model.VersionListing;
085import com.amazonaws.services.s3.transfer.Copy;
086import com.amazonaws.services.s3.transfer.Download;
087import com.amazonaws.services.s3.transfer.Upload;
088import com.amazonaws.services.s3.transfer.model.UploadResult;
089
090/**
091 * Blob storage in S3.
092 *
093 * @since 11.1
094 */
095public class S3BlobStore extends AbstractBlobStore {
096
097    private static final Logger log = LogManager.getLogger(S3BlobStore.class);
098
099    // x-amz-meta-username header
100    protected static final String USER_METADATA_USERNAME = "username";
101
102    protected final S3BlobStoreConfiguration config;
103
104    protected final AmazonS3 amazonS3;
105
106    protected final String bucketName;
107
108    protected final String bucketPrefix;
109
110    protected final boolean allowByteRange;
111
112    // note, we may choose to not use versions even in a versioned bucket
113    // if we want the bucket to record and keep old versions for us
114    /** If true, include the object version in the key. */
115    protected final boolean useVersion;
116
117    protected final BinaryGarbageCollector gc;
118
119    public S3BlobStore(String name, S3BlobStoreConfiguration config, KeyStrategy keyStrategy) {
120        super(name, keyStrategy);
121        this.config = config;
122        amazonS3 = config.amazonS3;
123        bucketName = config.bucketName;
124        bucketPrefix = config.bucketPrefix;
125        allowByteRange = config.getBooleanProperty(ALLOW_BYTE_RANGE);
126        useVersion = !keyStrategy.useDeDuplication() && isBucketVersioningEnabled();
127        gc = new S3BlobGarbageCollector();
128    }
129
130    public S3BlobStore getS3BinaryManager() {
131        return S3BlobStore.this;
132    }
133
134    protected static boolean isMissingKey(AmazonServiceException e) {
135        return e.getStatusCode() == 404 || "NoSuchKey".equals(e.getErrorCode()) || "Not Found".equals(e.getMessage());
136    }
137
138    protected static boolean isNotImplemented(AmazonServiceException e) {
139        return e.getStatusCode() == 501 || "NotImplemented".equals(e.getErrorCode());
140    }
141
142    protected boolean isBucketVersioningEnabled() {
143        try {
144            BucketVersioningConfiguration v = amazonS3.getBucketVersioningConfiguration(bucketName);
145            // if versioning is suspended, created objects won't have versions
146            return v.getStatus().equals(BucketVersioningConfiguration.ENABLED);
147        } catch (AmazonServiceException e) {
148            if (isNotImplemented(e)) {
149                // minio does not implement versioning
150                log.warn("Versioning not implemented for bucket: {}: {}", () -> bucketName, e::getMessage);
151                log.debug(e, e);
152                return false;
153            }
154            throw e;
155        }
156    }
157
158    @Override
159    public boolean hasVersioning() {
160        return useVersion;
161    }
162
163    @Override
164    public String writeBlob(BlobWriteContext blobWriteContext) throws IOException {
165
166        // detect copy from another S3 blob provider, to use direct S3-level copy
167        BlobContext blobContext = blobWriteContext.blobContext;
168        Blob blob = blobContext.blob;
169        String copiedKey = copyBlob(blob);
170        if (copiedKey != null) {
171            return copiedKey;
172        }
173
174        Path file;
175        String fileTraceSource;
176        Path tmp = null;
177        try {
178            Path blobWriteContextFile = blobWriteContext.getFile();
179            if (blobWriteContextFile != null) {
180                // we have a file, assume that the caller already observed the write
181                file = blobWriteContextFile;
182                fileTraceSource = "Nuxeo";
183            } else {
184                // no transfer to a file was done yet (no caching)
185                // we may be able to use the blob's underlying file, if not pure streaming
186                File blobFile = blob.getFile();
187                if (blobFile != null) {
188                    // otherwise use blob file directly
189                    if (blobWriteContext.writeObserver != null) {
190                        // but we must still run the writes through the write observer
191                        transfer(blobWriteContext, NULL_OUTPUT_STREAM);
192                    }
193                    file = blobFile.toPath();
194                    fileTraceSource = "Nuxeo";
195                } else {
196                    // we must transfer the blob stream to a tmp file
197                    tmp = Files.createTempFile("bin_", ".tmp");
198                    logTrace(null, "->", "tmp", "write");
199                    logTrace("hnote right: " + tmp.getFileName());
200                    transfer(blobWriteContext, tmp);
201                    file = tmp;
202                    fileTraceSource = "tmp";
203                }
204            }
205            String key = blobWriteContext.getKey(); // may depend on write observer, for example for digests
206            if (key == null) {
207                // should never happen unless an invalid WriteObserver is used in new code
208                throw new NuxeoException("Missing key");
209            } else if (key.indexOf(VER_SEP) >= 0) {
210                // should never happen unless AWS S3 changes their key format
211                throw new NuxeoException(
212                        "Invalid key '" + key + "', it contains the version separator '" + VER_SEP + "'");
213            }
214            String versionId = writeFile(key, file, blobContext, fileTraceSource);
215            return versionId == null ? key : key + VER_SEP + versionId;
216        } finally {
217            if (tmp != null) {
218                try {
219                    logTrace("tmp", "-->", "tmp", "delete");
220                    logTrace("hnote right: " + tmp.getFileName());
221                    Files.delete(tmp);
222                } catch (IOException e) {
223                    log.warn(e, e);
224                }
225            }
226        }
227    }
228
229    /** Writes a file with the given key and returns its version id. */
230    protected String writeFile(String key, Path file, BlobContext blobContext, String fileTraceSource)
231            throws IOException {
232        String bucketKey = bucketPrefix + key;
233        long t0 = 0;
234        if (log.isDebugEnabled()) {
235            t0 = System.currentTimeMillis();
236            log.debug("Writing s3://" + bucketName + "/" + bucketKey);
237        }
238
239        if (getKeyStrategy().useDeDuplication() && exists(bucketKey)) {
240            return null; // no key version used with deduplication
241        }
242
243        PutObjectRequest putObjectRequest;
244        ObjectMetadata objectMetadata = new ObjectMetadata();
245        if (config.useClientSideEncryption) {
246            // client-side encryption
247            putObjectRequest = new EncryptedPutObjectRequest(bucketName, bucketKey, file.toFile());
248        } else {
249            // server-side encryption
250            putObjectRequest = new PutObjectRequest(bucketName, bucketKey, file.toFile());
251            if (config.useServerSideEncryption) {
252                if (isNotBlank(config.serverSideKMSKeyID)) {
253                    // SSE-KMS
254                    SSEAwsKeyManagementParams params = new SSEAwsKeyManagementParams(config.serverSideKMSKeyID);
255                    putObjectRequest.setSSEAwsKeyManagementParams(params);
256                } else {
257                    // SSE-S3
258                    objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
259                }
260                // TODO SSE-C
261            }
262        }
263        setMetadata(objectMetadata, blobContext);
264        putObjectRequest.setMetadata(objectMetadata);
265        logTrace(fileTraceSource, "->", null, "write " + Files.size(file) + " bytes");
266        logTrace("hnote right: " + bucketKey);
267        Upload upload = config.transferManager.upload(putObjectRequest);
268        try {
269            UploadResult uploadResult = upload.waitForUploadResult();
270            // if we don't want to use versions, ignore them even though the bucket may be versioned
271            String versionId = useVersion ? uploadResult.getVersionId() : null;
272            if (log.isDebugEnabled()) {
273                long dtms = System.currentTimeMillis() - t0;
274                log.debug("Wrote s3://" + bucketName + "/" + bucketKey + " in " + dtms + "ms");
275            }
276            if (versionId != null) {
277                logTrace("<--", "v=" + versionId);
278            }
279            return versionId;
280        } catch (InterruptedException e) {
281            Thread.currentThread().interrupt();
282            throw new NuxeoException(e);
283        } catch (SdkBaseException e) {
284            // catch SdkBaseException and not just AmazonServiceException
285            throw new NuxeoException("Failed to write blob: " + key, e);
286        }
287    }
288
289    protected void setMetadata(ObjectMetadata objectMetadata, BlobContext blobContext) {
290        if (blobContext != null) {
291            Blob blob = blobContext.blob;
292            String filename = blob.getFilename();
293            if (filename != null) {
294                String contentDisposition = RFC2231.encodeContentDisposition(filename, false, null);
295                objectMetadata.setContentDisposition(contentDisposition);
296            }
297            String contentType = DownloadHelper.getContentTypeHeader(blob);
298            objectMetadata.setContentType(contentType);
299        }
300        if (config.metadataAddUsername) {
301            NuxeoPrincipal principal = NuxeoPrincipal.getCurrent();
302            if (principal != null && !(principal instanceof SystemPrincipal)) {
303                String username = principal.getActingUser();
304                if (username != null) {
305                    Map<String, String> userMetadata = Collections.singletonMap(USER_METADATA_USERNAME, username);
306                    objectMetadata.setUserMetadata(userMetadata);
307                }
308            }
309        }
310    }
311
312    @Override
313    public OptionalOrUnknown<Path> getFile(String key) {
314        return OptionalOrUnknown.unknown();
315    }
316
317    @Override
318    public OptionalOrUnknown<InputStream> getStream(String key) throws IOException {
319        return OptionalOrUnknown.unknown();
320    }
321
322    protected boolean exists(String bucketKey) {
323        try {
324            amazonS3.getObjectMetadata(bucketName, bucketKey);
325            if (log.isDebugEnabled()) {
326                log.debug("Blob s3://" + bucketName + "/" + bucketKey + " already exists");
327            }
328            logTrace("<--", "exists");
329            logTrace("hnote right: " + bucketKey);
330            return true;
331        } catch (AmazonServiceException e) {
332            if (isMissingKey(e)) {
333                logTrace("<--", "missing");
334                logTrace("hnote right: " + bucketKey);
335                return false;
336            }
337            throw e;
338        }
339    }
340
341    // used for tests
342    protected void clearBucket() {
343        logTrace("group ClearBucket");
344        ObjectListing list = null;
345        long n = 0;
346        do {
347            if (list == null) {
348                logTrace("->", "listObjects");
349                list = amazonS3.listObjects(bucketName);
350            } else {
351                list = amazonS3.listNextBatchOfObjects(list);
352            }
353            for (S3ObjectSummary summary : list.getObjectSummaries()) {
354                amazonS3.deleteObject(bucketName, summary.getKey());
355                n++;
356            }
357        } while (list.isTruncated());
358        if (n > 0) {
359            logTrace("loop " + n + " objects");
360            logTrace("->", "deleteObject");
361            logTrace("end");
362        }
363        VersionListing vlist = null;
364        long vn = 0;
365        do {
366            if (vlist == null) {
367                logTrace("->", "listVersions");
368                vlist = amazonS3.listVersions(new ListVersionsRequest().withBucketName(bucketName));
369            } else {
370                vlist = amazonS3.listNextBatchOfVersions(vlist);
371
372            }
373            for (S3VersionSummary vsummary : vlist.getVersionSummaries()) {
374                amazonS3.deleteVersion(bucketName, vsummary.getKey(), vsummary.getVersionId());
375                vn++;
376            }
377        } while (vlist.isTruncated());
378        if (vn > 0) {
379            logTrace("loop " + vn + " versions");
380            logTrace("->", "deleteVersion");
381            logTrace("end");
382        }
383        logTrace("end");
384    }
385
386    @Override
387    public boolean readBlob(String key, Path dest) throws IOException {
388        ByteRange byteRange;
389        if (allowByteRange) {
390            MutableObject<String> keyHolder = new MutableObject<>(key);
391            byteRange = getByteRangeFromKey(keyHolder);
392            key = keyHolder.getValue();
393        } else {
394            byteRange = null;
395        }
396        String objectKey;
397        String versionId;
398        int seppos;
399        if (useVersion && (seppos = key.indexOf(VER_SEP)) > 0) {
400            objectKey = key.substring(0, seppos);
401            versionId = key.substring(seppos + 1);
402        } else {
403            objectKey = key;
404            versionId = null;
405        }
406        String bucketKey = bucketPrefix + objectKey;
407        long t0 = 0;
408        if (log.isDebugEnabled()) {
409            t0 = System.currentTimeMillis();
410            log.debug("Reading s3://" + bucketName + "/" + bucketKey);
411        }
412        try {
413            GetObjectRequest getObjectRequest = new GetObjectRequest(bucketName, bucketKey, versionId);
414            if (byteRange != null) {
415                getObjectRequest.setRange(byteRange.getStart(), byteRange.getEnd());
416            }
417            Download download = config.transferManager.download(getObjectRequest, dest.toFile());
418            download.waitForCompletion();
419            logTrace("<-", "read " + Files.size(dest) + " bytes");
420            logTrace("hnote right: " + bucketKey + (versionId == null ? "" : " v=" + versionId));
421            if (log.isDebugEnabled()) {
422                long dtms = System.currentTimeMillis() - t0;
423                log.debug("Read s3://" + bucketName + "/" + bucketKey + " in " + dtms + "ms");
424            }
425            if (config.useClientSideEncryption) {
426                // can't efficiently check the decrypted digest
427                return true;
428            }
429            if (config.useServerSideEncryption && isNotBlank(config.serverSideKMSKeyID)) {
430                // can't get digest from key when using KMS
431                return true;
432            }
433            if (byteRange != null) {
434                // can't check digest if we have a byte range
435                return true;
436            }
437            String expectedDigest = getKeyStrategy().getDigestFromKey(objectKey);
438            if (expectedDigest != null) {
439                checkDigest(expectedDigest, download, dest);
440            }
441            // else nothing to compare to, key is not digest-based
442            return true;
443        } catch (AmazonServiceException e) {
444            if (isMissingKey(e)) {
445                logTrace("<--", "missing");
446                logTrace("hnote right: " + bucketKey + (versionId == null ? "" : " v=" + versionId));
447                if (log.isDebugEnabled()) {
448                    log.debug("Blob s3://" + bucketName + "/" + bucketKey + " does not exist");
449                }
450                return false;
451            }
452            throw new IOException(e);
453        } catch (InterruptedException e) {
454            Thread.currentThread().interrupt();
455            throw new NuxeoException(e);
456        }
457    }
458
459    protected void checkDigest(String expectedDigest, Download download, Path file) throws IOException {
460        if (!expectedDigest.equals(download.getObjectMetadata().getETag())) {
461            // if our digest algorithm is not MD5 (so the ETag can never match),
462            // or in case of a multipart upload (where the ETag may not be the MD5),
463            // check manually the object integrity
464            // TODO this is costly and it should possible to deactivate it
465            String digest = new DigestUtils(config.digestConfiguration.digestAlgorithm).digestAsHex(file.toFile());
466            if (!digest.equals(expectedDigest)) {
467                String msg = "Invalid S3 object digest, expected=" + expectedDigest + " actual=" + digest;
468                log.warn(msg);
469                throw new IOException(msg);
470            }
471        }
472    }
473
474    /**
475     * Copies the blob as a direct S3 operation, if possible.
476     *
477     * @return the key, or {@code null} if no copy was done
478     */
479    protected String copyBlob(Blob blob) throws IOException {
480        if (!(blob instanceof ManagedBlob)) {
481            // not a managed blob
482            return null;
483        }
484        ManagedBlob managedBlob = (ManagedBlob) blob;
485        BlobProvider blobProvider = Framework.getService(BlobManager.class)
486                                             .getBlobProvider(managedBlob.getProviderId());
487        if (!getKeyStrategy().useDeDuplication()) {
488            // key is not digest-based, so we don't know the destination key
489            return null;
490        }
491        if (!(blobProvider instanceof S3BlobProvider)) {
492            // not an S3 blob
493            return null;
494        }
495        S3BlobStore sourceStore = (S3BlobStore) ((S3BlobProvider) blobProvider).store.unwrap();
496        if (!sourceStore.getKeyStrategy().equals(getKeyStrategy())) {
497            // not the same digest
498            return null;
499        }
500        // use S3-level copy as the source blob provider is also S3
501        String sourceKey = stripBlobKeyPrefix(managedBlob.getKey());
502        String key = sourceKey; // because same key strategy
503        boolean found = copyBlob(key, sourceStore, sourceKey, false);
504        if (!found) {
505            throw new IOException("Cannot find source blob: " + sourceKey);
506        }
507        return key;
508    }
509
510    @Override
511    public boolean copyBlobIsOptimized(BlobStore sourceStore) {
512        return sourceStore instanceof S3BlobStore;
513    }
514
515    @Override
516    public boolean copyBlob(String key, BlobStore sourceStore, String sourceKey, boolean atomicMove)
517            throws IOException {
518        BlobStore unwrappedSourceStore = sourceStore.unwrap();
519        if (unwrappedSourceStore instanceof S3BlobStore) {
520            // attempt direct S3-level copy
521            S3BlobStore sourceS3BlobStore = (S3BlobStore) unwrappedSourceStore;
522            try {
523                boolean copied = copyBlob(key, sourceS3BlobStore, sourceKey, atomicMove);
524                if (copied) {
525                    return true;
526                }
527            } catch (AmazonServiceException e) {
528                if (isMissingKey(e)) {
529                    logTrace("<--", "missing");
530                    // source not found
531                    return false;
532                }
533                throw new IOException(e);
534            }
535            // fall through if not copied
536        }
537        return copyBlobGeneric(key, sourceStore, sourceKey, atomicMove);
538    }
539
540    /**
541     * @return {@code false} if generic copy is needed
542     * @throws AmazonServiceException if the source is missing
543     */
544    protected boolean copyBlob(String key, S3BlobStore sourceBlobStore, String sourceKey, boolean move)
545            throws AmazonServiceException { // NOSONAR
546        String sourceBucketName = sourceBlobStore.bucketName;
547        String sourceBucketKey = sourceBlobStore.bucketPrefix + sourceKey;
548        String bucketKey = bucketPrefix + key;
549
550        long t0 = 0;
551        if (log.isDebugEnabled()) {
552            t0 = System.currentTimeMillis();
553            log.debug("Copying s3://" + sourceBucketName + "/" + sourceBucketKey + " to s3://" + bucketName + "/"
554                    + bucketKey);
555        }
556
557        if (getKeyStrategy().useDeDuplication() && exists(bucketKey)) {
558            return true;
559        }
560
561        // copy the blob
562        logTrace("->", "getObjectMetadata");
563        ObjectMetadata sourceMetadata = amazonS3.getObjectMetadata(sourceBucketName, sourceBucketKey);
564        // don't catch AmazonServiceException if missing, caller will do it
565        long length = sourceMetadata.getContentLength();
566        logTrace("<-", length + " bytes");
567        try {
568
569            copyBlob(sourceBlobStore.config, sourceBucketKey, config, bucketKey, move);
570
571            if (log.isDebugEnabled()) {
572                long dtms = System.currentTimeMillis() - t0;
573                log.debug("Copied s3://" + sourceBucketName + "/" + sourceBucketKey + " to s3://" + bucketName + "/"
574                        + bucketKey + " in " + dtms + "ms");
575            }
576            return true;
577        } catch (AmazonServiceException e) {
578            logTrace("<--", "ERROR");
579            String message = "Direct copy failed from s3://" + sourceBucketName + "/" + sourceBucketKey + " to s3://"
580                    + bucketName + "/" + bucketKey + " (" + length + " bytes)";
581            log.warn(message + ", falling back to slow copy: " + e.getMessage());
582            log.debug(message, e);
583            return false;
584        }
585    }
586
587    protected void copyBlob(S3BlobStoreConfiguration sourceConfig, String sourceKey,
588            S3BlobStoreConfiguration destinationConfig, String destinationKey, boolean move) {
589        CopyObjectRequest copyObjectRequest = new CopyObjectRequest(sourceConfig.bucketName, sourceKey,
590                destinationConfig.bucketName, destinationKey);
591        if (destinationConfig.useServerSideEncryption) {
592            // server-side encryption
593            if (isNotBlank(destinationConfig.serverSideKMSKeyID)) {
594                // SSE-KMS
595                SSEAwsKeyManagementParams params = new SSEAwsKeyManagementParams(destinationConfig.serverSideKMSKeyID);
596                copyObjectRequest.setSSEAwsKeyManagementParams(params);
597            } else {
598                // SSE-S3
599                ObjectMetadata newObjectMetadata = new ObjectMetadata();
600                newObjectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
601                // CopyCallable.populateMetadataWithEncryptionParams will get the rest from the source
602                copyObjectRequest.setNewObjectMetadata(newObjectMetadata);
603            }
604            // TODO SSE-C
605        }
606        logTrace("->", "copyObject");
607        logTrace("hnote right: " + sourceKey + " to " + destinationKey);
608        Copy copy = destinationConfig.transferManager.copy(copyObjectRequest, sourceConfig.amazonS3, null);
609        try {
610            copy.waitForCompletion();
611        } catch (InterruptedException e) {
612            Thread.currentThread().interrupt();
613            throw new NuxeoException(e);
614        }
615        logTrace("<--", "copied");
616        if (move) {
617            logTrace("->", "deleteObject");
618            logTrace("hnote right: " + sourceKey);
619            amazonS3.deleteObject(sourceConfig.bucketName, sourceKey);
620        }
621    }
622
623    protected boolean copyBlobGeneric(String key, BlobStore sourceStore, String sourceKey, boolean atomicMove)
624            throws IOException {
625        Path tmp = null;
626        try {
627            OptionalOrUnknown<Path> fileOpt = sourceStore.getFile(sourceKey);
628            Path file;
629            String fileTraceSource;
630            if (fileOpt.isPresent()) {
631                file = fileOpt.get();
632                fileTraceSource = sourceStore.getName();
633            } else {
634                // no local file available, read from source
635                tmp = Files.createTempFile("bin_", ".tmp");
636                logTrace(null, "->", "tmp", "write");
637                logTrace("hnote right: " + tmp.getFileName());
638                boolean found = sourceStore.readBlob(sourceKey, tmp);
639                if (!found) {
640                    return false;
641                }
642                file = tmp;
643                fileTraceSource = "tmp";
644            }
645            String versionId = writeFile(key, file, null, fileTraceSource); // always atomic
646            if (versionId != null) {
647                throw new NuxeoException("Cannot copy blob if store has versioning");
648            }
649            if (atomicMove) {
650                sourceStore.deleteBlob(sourceKey);
651            }
652            return true;
653        } finally {
654            if (tmp != null) {
655                try {
656                    logTrace("tmp", "-->", "tmp", "delete");
657                    logTrace("hnote right: " + tmp.getFileName());
658                    Files.delete(tmp);
659                } catch (IOException e) {
660                    log.warn(e, e);
661                }
662            }
663        }
664    }
665
666    @Override
667    public void writeBlobProperties(BlobUpdateContext blobUpdateContext) throws IOException {
668        String key = blobUpdateContext.key;
669        String objectKey;
670        String versionId;
671        int seppos = key.indexOf(VER_SEP);
672        if (seppos < 0) {
673            objectKey = key;
674            versionId = null;
675        } else {
676            objectKey = key.substring(0, seppos);
677            versionId = key.substring(seppos + 1);
678        }
679        String bucketKey = bucketPrefix + objectKey;
680        try {
681            if (blobUpdateContext.updateRetainUntil != null) {
682                if (versionId == null) {
683                    throw new IOException("Cannot set retention on non-versioned blob");
684                }
685                Calendar retainUntil = blobUpdateContext.updateRetainUntil.retainUntil;
686                Date retainUntilDate = retainUntil == null ? null : retainUntil.getTime();
687                ObjectLockRetention retention = new ObjectLockRetention();
688                retention.withMode(config.retentionMode) //
689                         .withRetainUntilDate(retainUntilDate);
690                SetObjectRetentionRequest request = new SetObjectRetentionRequest();
691                request.withBucketName(bucketName) //
692                       .withKey(bucketKey)
693                       .withVersionId(versionId)
694                       .withRetention(retention);
695                logTrace("->", "setObjectRetention");
696                logTrace("hnote right: " + bucketKey + "v=" + versionId);
697                logTrace("rnote right: " + (retainUntil == null ? "null" : retainUntil.toInstant().toString()));
698                amazonS3.setObjectRetention(request);
699            }
700            if (blobUpdateContext.updateLegalHold != null) {
701                if (versionId == null) {
702                    throw new IOException("Cannot set legal hold on non-versioned blob");
703                }
704                boolean hold = blobUpdateContext.updateLegalHold.hold;
705                ObjectLockLegalHoldStatus status = hold ? ObjectLockLegalHoldStatus.ON : ObjectLockLegalHoldStatus.OFF;
706                ObjectLockLegalHold legalHold = new ObjectLockLegalHold().withStatus(status);
707                SetObjectLegalHoldRequest request = new SetObjectLegalHoldRequest();
708                request.withBucketName(bucketName) //
709                       .withKey(bucketKey)
710                       .withVersionId(versionId)
711                       .withLegalHold(legalHold);
712                logTrace("->", "setObjectLegalHold");
713                logTrace("hnote right: " + bucketKey + "v=" + versionId);
714                logTrace("rnote right: " + status.toString());
715                amazonS3.setObjectLegalHold(request);
716            }
717            if (blobUpdateContext.restoreForDuration != null) {
718                Duration duration = blobUpdateContext.restoreForDuration.duration;
719                // round up duration to days
720                int days = (int) duration.plusDays(1).minusSeconds(1).toDays();
721                RestoreObjectRequest request = new RestoreObjectRequest(bucketName, bucketKey, days).withVersionId(
722                        versionId);
723                amazonS3.restoreObjectV2(request);
724            }
725        } catch (AmazonServiceException e) {
726            if (isMissingKey(e)) {
727                logTrace("<--", "missing");
728                if (log.isDebugEnabled()) {
729                    log.debug("Blob s3://" + bucketName + "/" + bucketKey + " does not exist");
730                }
731            }
732            throw new IOException(e);
733        }
734    }
735
736    @Override
737    public void deleteBlob(String key) {
738        String objectKey;
739        String versionId;
740        int seppos = key.indexOf(VER_SEP);
741        if (seppos < 0) {
742            objectKey = key;
743            versionId = null;
744        } else {
745            objectKey = key.substring(0, seppos);
746            versionId = key.substring(seppos + 1);
747        }
748        String bucketKey = bucketPrefix + objectKey;
749        try {
750            if (versionId == null) {
751                logTrace("->", "deleteObject");
752                logTrace("hnote right: " + bucketKey);
753                amazonS3.deleteObject(bucketName, bucketKey);
754            } else {
755                logTrace("->", "deleteVersion");
756                logTrace("hnote right: " + bucketKey + " v=" + versionId);
757                amazonS3.deleteVersion(bucketName, bucketKey, versionId);
758            }
759        } catch (AmazonServiceException e) {
760            if (isMissingKey(e)) {
761                logTrace("<--", "missing");
762            } else {
763                log.warn(e, e);
764            }
765        }
766    }
767
768    @Override
769    public BinaryGarbageCollector getBinaryGarbageCollector() {
770        return gc;
771    }
772
773    /**
774     * Garbage collector for S3 binaries that stores the marked (in use) binaries in memory.
775     */
776    public class S3BlobGarbageCollector extends AbstractBlobGarbageCollector {
777
778        @Override
779        public String getId() {
780            return "s3:" + bucketName + "/" + bucketPrefix;
781        }
782
783        @Override
784        public Set<String> getUnmarkedBlobsAndUpdateStatus() {
785            // list S3 objects in the bucket
786            // record those not marked
787            boolean useDeDuplication = keyStrategy.useDeDuplication();
788            Set<String> unmarked = new HashSet<>();
789            ObjectListing list = null;
790            int prefixLength = bucketPrefix.length();
791            logTrace("->", "listObjects");
792            do {
793                if (list == null) {
794                    // use delimiter to avoid useless listing of objects in "subdirectories"
795                    ListObjectsRequest listObjectsRequest = new ListObjectsRequest(bucketName, bucketPrefix, null,
796                            S3BlobStoreConfiguration.DELIMITER, null);
797                    list = amazonS3.listObjects(listObjectsRequest);
798                } else {
799                    list = amazonS3.listNextBatchOfObjects(list);
800                }
801                for (S3ObjectSummary summary : list.getObjectSummaries()) {
802                    String key = summary.getKey().substring(prefixLength);
803                    if (useDeDuplication) {
804                        if (!config.digestConfiguration.isValidDigest(key)) {
805                            // ignore files that cannot be digests, for safety
806                            continue;
807                        }
808                    }
809                    long length = summary.getSize();
810                    if (marked.contains(key)) {
811                        status.numBinaries++;
812                        status.sizeBinaries += length;
813                    } else {
814                        status.numBinariesGC++;
815                        status.sizeBinariesGC += length;
816                        // record file to delete
817                        unmarked.add(key);
818                    }
819                }
820            } while (list.isTruncated());
821            logTrace("<--", (status.numBinaries + status.numBinariesGC) + " objects");
822            return unmarked;
823        }
824
825        @Override
826        public void removeBlobs(Set<String> keys) {
827            keys.forEach(S3BlobStore.this::deleteBlob);
828        }
829    }
830
831}