001/*
002 * (C) Copyright 2011-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Mathieu Guillaume
018 *     Florent Guillaume
019 *     Luís Duarte
020 */
021package org.nuxeo.ecm.core.storage.sql;
022
023import static org.apache.commons.lang3.StringUtils.isBlank;
024import static org.apache.commons.lang3.StringUtils.isNotBlank;
025import static org.nuxeo.ecm.blob.s3.S3BlobStoreConfiguration.DISABLE_PROXY_PROPERTY;
026import static org.nuxeo.ecm.blob.s3.S3BlobStoreConfiguration.MULTIPART_CLEANUP_DISABLED_PROPERTY;
027
028import java.io.File;
029import java.io.FileInputStream;
030import java.io.IOException;
031import java.net.URI;
032import java.net.URISyntaxException;
033import java.net.URL;
034import java.security.GeneralSecurityException;
035import java.security.KeyPair;
036import java.security.KeyStore;
037import java.security.PrivateKey;
038import java.security.PublicKey;
039import java.security.cert.Certificate;
040import java.util.Collection;
041import java.util.Date;
042import java.util.HashSet;
043import java.util.Set;
044import java.util.regex.Pattern;
045
046import javax.servlet.http.HttpServletRequest;
047
048import org.apache.commons.codec.digest.DigestUtils;
049import org.apache.commons.lang3.StringUtils;
050import org.apache.commons.logging.Log;
051import org.apache.commons.logging.LogFactory;
052import org.nuxeo.common.Environment;
053import org.nuxeo.ecm.blob.AbstractBinaryGarbageCollector;
054import org.nuxeo.ecm.blob.AbstractCloudBinaryManager;
055import org.nuxeo.ecm.blob.s3.S3ManagedTransfer;
056import org.nuxeo.ecm.core.api.Blob;
057import org.nuxeo.ecm.core.api.NuxeoException;
058import org.nuxeo.ecm.core.blob.BlobManager;
059import org.nuxeo.ecm.core.blob.BlobProvider;
060import org.nuxeo.ecm.core.blob.ManagedBlob;
061import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
062import org.nuxeo.ecm.core.blob.binary.FileStorage;
063import org.nuxeo.runtime.api.Framework;
064import org.nuxeo.runtime.aws.NuxeoAWSRegionProvider;
065
066import com.amazonaws.AmazonClientException;
067import com.amazonaws.AmazonServiceException;
068import com.amazonaws.ClientConfiguration;
069import com.amazonaws.HttpMethod;
070import com.amazonaws.auth.AWSCredentialsProvider;
071import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
072import com.amazonaws.services.s3.AmazonS3;
073import com.amazonaws.services.s3.AmazonS3Builder;
074import com.amazonaws.services.s3.AmazonS3ClientBuilder;
075import com.amazonaws.services.s3.AmazonS3EncryptionClientBuilder;
076import com.amazonaws.services.s3.model.AmazonS3Exception;
077import com.amazonaws.services.s3.model.CannedAccessControlList;
078import com.amazonaws.services.s3.model.CryptoConfiguration;
079import com.amazonaws.services.s3.model.CopyObjectRequest;
080import com.amazonaws.services.s3.model.EncryptedPutObjectRequest;
081import com.amazonaws.services.s3.model.EncryptionMaterials;
082import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest;
083import com.amazonaws.services.s3.model.GetObjectRequest;
084import com.amazonaws.services.s3.model.ListObjectsRequest;
085import com.amazonaws.services.s3.model.ObjectListing;
086import com.amazonaws.services.s3.model.ObjectMetadata;
087import com.amazonaws.services.s3.model.PutObjectRequest;
088import com.amazonaws.services.s3.model.S3ObjectSummary;
089import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
090import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider;
091import com.amazonaws.services.s3.transfer.Copy;
092import com.amazonaws.services.s3.transfer.Download;
093import com.amazonaws.services.s3.transfer.TransferManager;
094import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
095import com.amazonaws.services.s3.transfer.Upload;
096
097/**
098 * A Binary Manager that stores binaries as S3 BLOBs
099 * <p>
100 * The BLOBs are cached locally on first access for efficiency.
101 * <p>
102 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file
103 * if accessed before the stream.
104 */
105public class S3BinaryManager extends AbstractCloudBinaryManager implements S3ManagedTransfer {
106
107    private static final Log log = LogFactory.getLog(S3BinaryManager.class);
108
109    public static final String SYSTEM_PROPERTY_PREFIX = "nuxeo.s3storage";
110
111    public static final String BUCKET_NAME_PROPERTY = "bucket";
112
113    public static final String BUCKET_PREFIX_PROPERTY = "bucket_prefix";
114
115    public static final String BUCKET_REGION_PROPERTY = "region";
116
117    public static final String AWS_ID_PROPERTY = "awsid";
118
119    public static final String AWS_SECRET_PROPERTY = "awssecret";
120
121    /**
122     * @since 10.10
123     */
124    public static final String AWS_SESSION_TOKEN_PROPERTY = "awstoken";
125
126    /** AWS ClientConfiguration default 50 */
127    public static final String CONNECTION_MAX_PROPERTY = "connection.max";
128
129    /** AWS ClientConfiguration default 3 (with exponential backoff) */
130    public static final String CONNECTION_RETRY_PROPERTY = "connection.retry";
131
132    /** AWS ClientConfiguration default 50*1000 = 50s */
133    public static final String CONNECTION_TIMEOUT_PROPERTY = "connection.timeout";
134
135    /** AWS ClientConfiguration default 50*1000 = 50s */
136    public static final String SOCKET_TIMEOUT_PROPERTY = "socket.timeout";
137
138    public static final String KEYSTORE_FILE_PROPERTY = "crypt.keystore.file";
139
140    public static final String KEYSTORE_PASS_PROPERTY = "crypt.keystore.password";
141
142    public static final String SERVERSIDE_ENCRYPTION_PROPERTY = "crypt.serverside";
143
144    public static final String SERVERSIDE_ENCRYPTION_KMS_KEY_PROPERTY = "crypt.kms.key";
145
146    public static final String PRIVKEY_ALIAS_PROPERTY = "crypt.key.alias";
147
148    public static final String PRIVKEY_PASS_PROPERTY = "crypt.key.password";
149
150    public static final String ENDPOINT_PROPERTY = "endpoint";
151
152    /**
153     * @since 10.3
154     */
155    public static final String PATHSTYLEACCESS_PROPERTY = "pathstyleaccess";
156
157    /** @since 11.1 */
158    public static final String ACCELERATE_MODE_PROPERTY = "accelerateMode";
159
160    public static final String DIRECTDOWNLOAD_PROPERTY_COMPAT = "downloadfroms3";
161
162    public static final String DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT = "downloadfroms3.expire";
163
164    public static final String DELIMITER = "/";
165
166    /** @deprecated since 11.1, now unused */
167    @Deprecated
168    private static final Pattern MD5_RE = Pattern.compile("[0-9a-f]{32}");
169
170    protected String bucketName;
171
172    protected String bucketNamePrefix;
173
174    protected AWSCredentialsProvider awsCredentialsProvider;
175
176    protected ClientConfiguration clientConfiguration;
177
178    protected EncryptionMaterials encryptionMaterials;
179
180    protected boolean isEncrypted;
181
182    protected CryptoConfiguration cryptoConfiguration;
183
184    protected boolean useServerSideEncryption;
185
186    protected String serverSideKMSKeyID;
187
188    protected AmazonS3 amazonS3;
189
190    protected TransferManager transferManager;
191
192    @Override
193    public void close() {
194        // this also shuts down the AmazonS3Client
195        transferManager.shutdownNow();
196        super.close();
197    }
198
199    /**
200     * Aborts uploads that crashed and are older than 1 day.
201     *
202     * @since 7.2
203     */
204    protected void abortOldUploads() {
205        if (getBooleanProperty(MULTIPART_CLEANUP_DISABLED_PROPERTY)) {
206            log.debug("Cleanup of old multipart uploads is disabled");
207            return;
208        }
209        // Async to avoid issues with transferManager.abortMultipartUploads taking a very long time.
210        // See NXP-28571.
211        new Thread(this::abortOldMultipartUploadsInternal, "Nuxeo-S3-abortOldMultipartUploads-" + bucketName).start();
212    }
213
214    // executed in a separate thread
215    protected void abortOldMultipartUploadsInternal() {
216        int oneDay = 1000 * 60 * 60 * 24;
217        try {
218            log.debug("Starting cleanup of old multipart uploads for bucket: " + bucketName);
219            transferManager.abortMultipartUploads(bucketName, new Date(System.currentTimeMillis() - oneDay));
220            log.debug("Cleanup done for bucket: " + bucketName);
221        } catch (AmazonS3Exception e) {
222            if (e.getStatusCode() == 400 || e.getStatusCode() == 404) {
223                log.error("Your cloud provider does not support aborting old uploads");
224                return;
225            }
226            throw new NuxeoException("Failed to abort old uploads", e);
227        }
228    }
229
230    @Override
231    protected void setupCloudClient() throws IOException {
232        // Get settings from the configuration
233        bucketName = getProperty(BUCKET_NAME_PROPERTY);
234        // as bucket prefix is optional we don't want to use the fallback mechanism
235        bucketNamePrefix = StringUtils.defaultString(properties.get(BUCKET_PREFIX_PROPERTY));
236        String bucketRegion = getProperty(BUCKET_REGION_PROPERTY);
237        if (isBlank(bucketRegion)) {
238            bucketRegion = NuxeoAWSRegionProvider.getInstance().getRegion();
239        }
240        String awsID = getProperty(AWS_ID_PROPERTY);
241        String awsSecret = getProperty(AWS_SECRET_PROPERTY);
242        String awsToken = getProperty(AWS_SESSION_TOKEN_PROPERTY);
243
244        boolean proxyDisabled = Framework.isBooleanPropertyTrue(DISABLE_PROXY_PROPERTY);
245        String proxyHost = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_HOST);
246        String proxyPort = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PORT);
247        String proxyLogin = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_LOGIN);
248        String proxyPassword = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PASSWORD);
249
250        int maxConnections = getIntProperty(CONNECTION_MAX_PROPERTY);
251        int maxErrorRetry = getIntProperty(CONNECTION_RETRY_PROPERTY);
252        int connectionTimeout = getIntProperty(CONNECTION_TIMEOUT_PROPERTY);
253        int socketTimeout = getIntProperty(SOCKET_TIMEOUT_PROPERTY);
254
255        String keystoreFile = getProperty(KEYSTORE_FILE_PROPERTY);
256        String keystorePass = getProperty(KEYSTORE_PASS_PROPERTY);
257        String privkeyAlias = getProperty(PRIVKEY_ALIAS_PROPERTY);
258        String privkeyPass = getProperty(PRIVKEY_PASS_PROPERTY);
259        String endpoint = getProperty(ENDPOINT_PROPERTY);
260        boolean accelerateModeEnabled = getBooleanProperty(ACCELERATE_MODE_PROPERTY);
261        boolean pathStyleAccessEnabled = getBooleanProperty(PATHSTYLEACCESS_PROPERTY);
262        String sseprop = getProperty(SERVERSIDE_ENCRYPTION_PROPERTY);
263        if (isNotBlank(sseprop)) {
264            useServerSideEncryption = Boolean.parseBoolean(sseprop);
265            serverSideKMSKeyID = getProperty(SERVERSIDE_ENCRYPTION_KMS_KEY_PROPERTY);
266        }
267
268        if (isBlank(bucketName)) {
269            throw new RuntimeException("Missing conf: " + BUCKET_NAME_PROPERTY);
270        }
271
272        if (!isBlank(bucketNamePrefix) && !bucketNamePrefix.endsWith(DELIMITER)) {
273            log.debug(String.format("%s %s S3 bucket prefix should end with '/' : added automatically.",
274                    BUCKET_PREFIX_PROPERTY, bucketNamePrefix));
275            bucketNamePrefix += DELIMITER;
276        }
277        if (isNotBlank(namespace)) {
278            // use namespace as an additional prefix
279            bucketNamePrefix += namespace;
280            if (!bucketNamePrefix.endsWith(DELIMITER)) {
281                bucketNamePrefix += DELIMITER;
282            }
283        }
284
285        // set up credentials
286        awsCredentialsProvider = S3Utils.getAWSCredentialsProvider(awsID, awsSecret, awsToken);
287
288        // set up client configuration
289        clientConfiguration = new ClientConfiguration();
290        if (!proxyDisabled) {
291            if (isNotBlank(proxyHost)) {
292                clientConfiguration.setProxyHost(proxyHost);
293            }
294            if (isNotBlank(proxyPort)) {
295                clientConfiguration.setProxyPort(Integer.parseInt(proxyPort));
296            }
297            if (isNotBlank(proxyLogin)) {
298                clientConfiguration.setProxyUsername(proxyLogin);
299            }
300            if (proxyPassword != null) { // could be blank
301                clientConfiguration.setProxyPassword(proxyPassword);
302            }
303        }
304        if (maxConnections > 0) {
305            clientConfiguration.setMaxConnections(maxConnections);
306        }
307        if (maxErrorRetry >= 0) { // 0 is allowed
308            clientConfiguration.setMaxErrorRetry(maxErrorRetry);
309        }
310        if (connectionTimeout >= 0) { // 0 is allowed
311            clientConfiguration.setConnectionTimeout(connectionTimeout);
312        }
313        if (socketTimeout >= 0) { // 0 is allowed
314            clientConfiguration.setSocketTimeout(socketTimeout);
315        }
316
317        // set up encryption
318        encryptionMaterials = null;
319        if (isNotBlank(keystoreFile)) {
320            boolean confok = true;
321            if (keystorePass == null) { // could be blank
322                log.error("Keystore password missing");
323                confok = false;
324            }
325            if (isBlank(privkeyAlias)) {
326                log.error("Key alias missing");
327                confok = false;
328            }
329            if (privkeyPass == null) { // could be blank
330                log.error("Key password missing");
331                confok = false;
332            }
333            if (!confok) {
334                throw new RuntimeException("S3 Crypto configuration incomplete");
335            }
336            try {
337                // Open keystore
338                File ksFile = new File(keystoreFile);
339                KeyStore keystore;
340                try (FileInputStream ksStream = new FileInputStream(ksFile)) {
341                    keystore = KeyStore.getInstance(KeyStore.getDefaultType());
342                    keystore.load(ksStream, keystorePass.toCharArray());
343                }
344                // Get keypair for alias
345                if (!keystore.isKeyEntry(privkeyAlias)) {
346                    throw new RuntimeException("Alias " + privkeyAlias + " is missing or not a key alias");
347                }
348                PrivateKey privKey = (PrivateKey) keystore.getKey(privkeyAlias, privkeyPass.toCharArray());
349                Certificate cert = keystore.getCertificate(privkeyAlias);
350                PublicKey pubKey = cert.getPublicKey();
351                KeyPair keypair = new KeyPair(pubKey, privKey);
352                // Get encryptionMaterials from keypair
353                encryptionMaterials = new EncryptionMaterials(keypair);
354                cryptoConfiguration = new CryptoConfiguration();
355            } catch (IOException | GeneralSecurityException e) {
356                throw new RuntimeException("Could not read keystore: " + keystoreFile + ", alias: " + privkeyAlias, e);
357            }
358        }
359        isEncrypted = encryptionMaterials != null;
360
361        AmazonS3Builder<?, ?> s3Builder;
362        // Try to create bucket if it doesn't exist
363        if (!isEncrypted) {
364            s3Builder = AmazonS3ClientBuilder.standard()
365                                             .withCredentials(awsCredentialsProvider)
366                                             .withClientConfiguration(clientConfiguration);
367
368        } else {
369            s3Builder = AmazonS3EncryptionClientBuilder.standard()
370                                                       .withClientConfiguration(clientConfiguration)
371                                                       .withCryptoConfiguration(cryptoConfiguration)
372                                                       .withCredentials(awsCredentialsProvider)
373                                                       .withEncryptionMaterials(new StaticEncryptionMaterialsProvider(
374                                                               encryptionMaterials));
375        }
376        if (pathStyleAccessEnabled) {
377            log.debug("Path-style access enabled");
378            s3Builder.enablePathStyleAccess();
379        }
380        if (isNotBlank(endpoint)) {
381            s3Builder = s3Builder.withEndpointConfiguration(new EndpointConfiguration(endpoint, bucketRegion));
382        } else {
383            s3Builder = s3Builder.withRegion(bucketRegion);
384        }
385        s3Builder.setAccelerateModeEnabled(accelerateModeEnabled);
386
387        amazonS3 = s3Builder.build();
388
389        try {
390            if (!amazonS3.doesBucketExist(bucketName)) {
391                amazonS3.createBucket(bucketName);
392                amazonS3.setBucketAcl(bucketName, CannedAccessControlList.Private);
393            }
394        } catch (AmazonClientException e) {
395            throw new IOException(e);
396        }
397
398        // compat for NXP-17895, using "downloadfroms3", to be removed
399        // these two fields have already been initialized by the base class initialize()
400        // using standard property "directdownload"
401        String dd = getProperty(DIRECTDOWNLOAD_PROPERTY_COMPAT);
402        if (dd != null) {
403            directDownload = Boolean.parseBoolean(dd);
404        }
405        int dde = getIntProperty(DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT);
406        if (dde >= 0) {
407            directDownloadExpire = dde;
408        }
409
410        transferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build();
411        abortOldUploads();
412    }
413
414    @Override
415    public TransferManager getTransferManager() {
416        return transferManager;
417    }
418
419    protected void removeBinary(String digest) {
420        amazonS3.deleteObject(bucketName, bucketNamePrefix + digest);
421    }
422
423    @Override
424    protected String getSystemPropertyPrefix() {
425        return SYSTEM_PROPERTY_PREFIX;
426    }
427
428    @Override
429    protected BinaryGarbageCollector instantiateGarbageCollector() {
430        return new S3BinaryGarbageCollector(this);
431    }
432
433    @Override
434    public void removeBinaries(Collection<String> digests) {
435        digests.forEach(this::removeBinary);
436    }
437
438    protected static boolean isMissingKey(AmazonClientException e) {
439        if (e instanceof AmazonServiceException) {
440            AmazonServiceException ase = (AmazonServiceException) e;
441            return (ase.getStatusCode() == 404) || "NoSuchKey".equals(ase.getErrorCode())
442                    || "Not Found".equals(e.getMessage());
443        }
444        return false;
445    }
446
447    /** @deprecated since 11.1, now unused */
448    @Deprecated
449    public static boolean isMD5(String digest) {
450        return MD5_RE.matcher(digest).matches();
451    }
452
453    /**
454     * Used in the healthCheck; the transferManager should be initialized and the bucket accessible
455     *
456     * @since 9.3
457     */
458    public boolean canAccessBucket() {
459        return transferManager != null && transferManager.getAmazonS3Client().doesBucketExist(bucketName);
460    }
461
462    @Override
463    protected FileStorage getFileStorage() {
464        return new S3FileStorage();
465    }
466
467    /**
468     * Gets the AWSCredentialsProvider.
469     *
470     * @since 10.2
471     */
472    public AWSCredentialsProvider getAwsCredentialsProvider() {
473        return awsCredentialsProvider;
474    }
475
476    /**
477     * Gets AmazonS3.
478     *
479     * @since 10.2
480     */
481    public AmazonS3 getAmazonS3() {
482        return amazonS3;
483    }
484
485    /**
486     * Gets the bucket name.
487     *
488     * @since 11.1
489     */
490    public String getBucketName() {
491        return bucketName;
492    }
493
494    /**
495     * Gets the bucket prefix.
496     *
497     * @since 11.1
498     */
499    public String getBucketPrefix() {
500        return bucketNamePrefix;
501    }
502
503    @Override
504    public String writeBlob(Blob blob) throws IOException {
505        // Attempt to do S3 Copy if the Source Blob provider is also S3
506        if (blob instanceof ManagedBlob) {
507            ManagedBlob managedBlob = (ManagedBlob) blob;
508            BlobProvider blobProvider = Framework.getService(BlobManager.class)
509                                                 .getBlobProvider(managedBlob.getProviderId());
510            if (blobProvider instanceof S3BinaryManager && blobProvider != this) {
511                // use S3 direct copy as the source blob provider is also S3
512                String key = copyBlob((S3BinaryManager) blobProvider, managedBlob.getKey());
513                if (key != null) {
514                    return key;
515                }
516            }
517        }
518        return super.writeBlob(blob);
519    }
520
521    /**
522     * Copies a blob. Returns {@code null} if the copy was not possible.
523     *
524     * @param sourceBlobProvider the source blob provider
525     * @param blobKey the source blob key
526     * @return the copied blob key, or {@code null} if the copy was not possible
527     * @since 10.1
528     */
529    protected String copyBlob(S3BinaryManager sourceBlobProvider, String blobKey) throws IOException {
530        String digest = blobKey;
531        int colon = digest.indexOf(':');
532        if (colon >= 0) {
533            digest = digest.substring(colon + 1);
534        }
535        String sourceBucketName = sourceBlobProvider.bucketName;
536        String sourceKey = sourceBlobProvider.bucketNamePrefix + digest;
537        String key = bucketNamePrefix + digest;
538        long t0 = 0;
539        if (log.isDebugEnabled()) {
540            t0 = System.currentTimeMillis();
541            log.debug("copying blob " + sourceKey + " to " + key);
542        }
543
544        try {
545            amazonS3.getObjectMetadata(bucketName, key);
546            if (log.isDebugEnabled()) {
547                log.debug("blob " + key + " is already in S3");
548            }
549            return digest;
550        } catch (AmazonServiceException e) {
551            if (!isMissingKey(e)) {
552                throw new IOException(e);
553            }
554            // object does not exist, just continue
555        }
556
557        // not already present -> copy the blob
558        ObjectMetadata sourceMetadata;
559        try {
560            sourceMetadata = amazonS3.getObjectMetadata(sourceBucketName, sourceKey);
561        } catch (AmazonServiceException e) {
562            throw new NuxeoException("Source blob does not exists: s3://" + sourceBucketName + "/" + sourceKey, e);
563        }
564        long length = sourceMetadata.getContentLength();
565        try {
566            CopyObjectRequest copyObjectRequest = new CopyObjectRequest(sourceBucketName, sourceKey, bucketName, key);
567            if (useServerSideEncryption) {
568                if (isNotBlank(serverSideKMSKeyID)) { // TODO
569                    log.warn("S3 copy not supported with KMS, falling back to regular copy");
570                    return null;
571                }
572                // SSE-S3
573                ObjectMetadata newObjectMetadata = new ObjectMetadata();
574                newObjectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
575                copyObjectRequest.setNewObjectMetadata(newObjectMetadata);
576            }
577            Copy copy = transferManager.copy(copyObjectRequest);
578            try {
579                copy.waitForCompletion();
580            } catch (InterruptedException e) {
581                Thread.currentThread().interrupt();
582                throw new NuxeoException(e);
583            }
584            if (log.isDebugEnabled()) {
585                long dtms = System.currentTimeMillis() - t0;
586                log.debug("copied blob " + sourceKey + " to " + key + " in " + dtms + "ms");
587            }
588            return digest;
589        } catch (AmazonServiceException e) {
590            String message = "S3 copy not supported from s3://" + sourceBucketName + "/" + sourceKey + " to s3://"
591                    + bucketName + "/" + key + " (" + length + " bytes)";
592            log.warn(message + ", falling back to regular copy: " + e.getMessage());
593            log.debug(message, e);
594            return null;
595        }
596    }
597
598    public class S3FileStorage implements FileStorage {
599
600        @Override
601        public void storeFile(String digest, File file) throws IOException {
602            long t0 = 0;
603            if (log.isDebugEnabled()) {
604                t0 = System.currentTimeMillis();
605                log.debug("storing blob " + digest + " to S3");
606            }
607            String key = bucketNamePrefix + digest;
608            try {
609                amazonS3.getObjectMetadata(bucketName, key);
610                if (log.isDebugEnabled()) {
611                    log.debug("blob " + digest + " is already in S3");
612                }
613            } catch (AmazonClientException e) {
614                if (!isMissingKey(e)) {
615                    throw new IOException(e);
616                }
617                // not already present -> store the blob
618                PutObjectRequest request;
619                if (!isEncrypted) {
620                    request = new PutObjectRequest(bucketName, key, file);
621                    if (useServerSideEncryption) {
622                        ObjectMetadata objectMetadata = new ObjectMetadata();
623                        if (isNotBlank(serverSideKMSKeyID)) {
624                            SSEAwsKeyManagementParams keyManagementParams = new SSEAwsKeyManagementParams(
625                                    serverSideKMSKeyID);
626                            request = request.withSSEAwsKeyManagementParams(keyManagementParams);
627                        } else {
628                            objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
629                        }
630                        request.setMetadata(objectMetadata);
631                    }
632                } else {
633                    request = new EncryptedPutObjectRequest(bucketName, key, file);
634                }
635                Upload upload = transferManager.upload(request);
636                try {
637                    upload.waitForUploadResult();
638                } catch (AmazonClientException ee) {
639                    throw new IOException(ee);
640                } catch (InterruptedException ee) {
641                    Thread.currentThread().interrupt();
642                    throw new RuntimeException(ee);
643                } finally {
644                    if (log.isDebugEnabled()) {
645                        long dtms = System.currentTimeMillis() - t0;
646                        log.debug("stored blob " + digest + " to S3 in " + dtms + "ms");
647                    }
648                }
649            }
650        }
651
652        @Override
653        public boolean fetchFile(String digest, File file) throws IOException {
654            long t0 = 0;
655            if (log.isDebugEnabled()) {
656                t0 = System.currentTimeMillis();
657                log.debug("fetching blob " + digest + " from S3");
658            }
659            try {
660                Download download = transferManager.download(
661                        new GetObjectRequest(bucketName, bucketNamePrefix + digest), file);
662                download.waitForCompletion();
663                if (isEncrypted) {
664                    // can't easily check the decrypted digest
665                    return true;
666                }
667                if (!digest.equals(download.getObjectMetadata().getETag())) {
668                    // if our digest algorithm is not MD5 (so the ETag can never match),
669                    // or in case of a multipart upload (where the ETag may not be the MD5),
670                    // check manually the object integrity
671                    // TODO this is costly and it should possible to deactivate it
672                    String currentDigest = new DigestUtils(getDigestAlgorithm()).digestAsHex(file);
673                    if (!currentDigest.equals(digest)) {
674                        String msg = "Invalid S3 object digest, expected=" + digest + " actual=" + currentDigest;
675                        log.error(msg);
676                        throw new IOException(msg);
677                    }
678                }
679                return true;
680            } catch (AmazonClientException e) {
681                if (!isMissingKey(e)) {
682                    throw new IOException(e);
683                }
684                return false;
685            } catch (InterruptedException e) {
686                Thread.currentThread().interrupt();
687                throw new RuntimeException(e);
688            } finally {
689                if (log.isDebugEnabled()) {
690                    long dtms = System.currentTimeMillis() - t0;
691                    log.debug("fetched blob " + digest + " from S3 in " + dtms + "ms");
692                }
693            }
694
695        }
696    }
697
698    /**
699     * Garbage collector for S3 binaries that stores the marked (in use) binaries in memory.
700     */
701    public static class S3BinaryGarbageCollector extends AbstractBinaryGarbageCollector<S3BinaryManager> {
702
703        protected S3BinaryGarbageCollector(S3BinaryManager binaryManager) {
704            super(binaryManager);
705        }
706
707        @Override
708        public String getId() {
709            return "s3:" + binaryManager.bucketName;
710        }
711
712        @Override
713        public Set<String> getUnmarkedBlobs() {
714            // list S3 objects in the bucket
715            // record those not marked
716            Set<String> unmarked = new HashSet<>();
717            ObjectListing list = null;
718            do {
719                if (list == null) {
720                    // use delimiter to avoid useless listing of objects in "subdirectories"
721                    ListObjectsRequest listObjectsRequest = new ListObjectsRequest(binaryManager.bucketName,
722                            binaryManager.bucketNamePrefix, null, DELIMITER, null);
723                    list = binaryManager.amazonS3.listObjects(listObjectsRequest);
724                } else {
725                    list = binaryManager.amazonS3.listNextBatchOfObjects(list);
726                }
727                int prefixLength = binaryManager.bucketNamePrefix.length();
728                for (S3ObjectSummary summary : list.getObjectSummaries()) {
729                    String digest = summary.getKey().substring(prefixLength);
730                    if (!binaryManager.isValidDigest(digest)) {
731                        // ignore files that cannot be digests, for safety
732                        continue;
733                    }
734                    long length = summary.getSize();
735                    if (marked.contains(digest)) {
736                        status.numBinaries++;
737                        status.sizeBinaries += length;
738                    } else {
739                        status.numBinariesGC++;
740                        status.sizeBinariesGC += length;
741                        // record file to delete
742                        unmarked.add(digest);
743                        marked.remove(digest); // optimize memory
744                    }
745                }
746            } while (list.isTruncated());
747
748            return unmarked;
749        }
750    }
751
752    // ******************** BlobProvider ********************
753
754    @Override
755    protected URI getRemoteUri(String digest, ManagedBlob blob, HttpServletRequest servletRequest) throws IOException {
756        String key = bucketNamePrefix + digest;
757        Date expiration = new Date();
758        expiration.setTime(expiration.getTime() + directDownloadExpire * 1000);
759        GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucketName, key, HttpMethod.GET);
760        request.addRequestParameter("response-content-type", getContentTypeHeader(blob));
761        request.addRequestParameter("response-content-disposition", getContentDispositionHeader(blob, null));
762        request.setExpiration(expiration);
763        URL url = amazonS3.generatePresignedUrl(request);
764        try {
765            return url.toURI();
766        } catch (URISyntaxException e) {
767            throw new IOException(e);
768        }
769    }
770
771}