001/*
002 * (C) Copyright 2011-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Mathieu Guillaume
018 *     Florent Guillaume
019 *     Luís Duarte
020 */
021package org.nuxeo.ecm.core.storage.sql;
022
023import static org.apache.commons.lang3.StringUtils.isBlank;
024import static org.apache.commons.lang3.StringUtils.isNotBlank;
025import static org.nuxeo.ecm.core.storage.sql.S3Utils.NON_MULTIPART_COPY_MAX_SIZE;
026
027import java.io.File;
028import java.io.FileInputStream;
029import java.io.IOException;
030import java.net.URI;
031import java.net.URISyntaxException;
032import java.net.URL;
033import java.security.GeneralSecurityException;
034import java.security.KeyPair;
035import java.security.KeyStore;
036import java.security.PrivateKey;
037import java.security.PublicKey;
038import java.security.cert.Certificate;
039import java.util.Collection;
040import java.util.Date;
041import java.util.HashSet;
042import java.util.Set;
043import java.util.regex.Pattern;
044
045import javax.servlet.http.HttpServletRequest;
046
047import org.apache.commons.codec.digest.DigestUtils;
048import org.apache.commons.lang3.StringUtils;
049import org.apache.commons.logging.Log;
050import org.apache.commons.logging.LogFactory;
051import org.nuxeo.common.Environment;
052import org.nuxeo.ecm.blob.AbstractBinaryGarbageCollector;
053import org.nuxeo.ecm.blob.AbstractCloudBinaryManager;
054import org.nuxeo.ecm.core.api.Blob;
055import org.nuxeo.ecm.core.api.NuxeoException;
056import org.nuxeo.ecm.core.blob.BlobManager;
057import org.nuxeo.ecm.core.blob.BlobProvider;
058import org.nuxeo.ecm.core.blob.ManagedBlob;
059import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
060import org.nuxeo.ecm.core.blob.binary.FileStorage;
061import org.nuxeo.runtime.api.Framework;
062import org.nuxeo.runtime.aws.NuxeoAWSRegionProvider;
063
064import com.amazonaws.AmazonClientException;
065import com.amazonaws.AmazonServiceException;
066import com.amazonaws.ClientConfiguration;
067import com.amazonaws.HttpMethod;
068import com.amazonaws.auth.AWSCredentialsProvider;
069import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
070import com.amazonaws.services.s3.AmazonS3;
071import com.amazonaws.services.s3.AmazonS3Builder;
072import com.amazonaws.services.s3.AmazonS3ClientBuilder;
073import com.amazonaws.services.s3.AmazonS3EncryptionClientBuilder;
074import com.amazonaws.services.s3.model.AmazonS3Exception;
075import com.amazonaws.services.s3.model.CannedAccessControlList;
076import com.amazonaws.services.s3.model.CryptoConfiguration;
077import com.amazonaws.services.s3.model.EncryptedPutObjectRequest;
078import com.amazonaws.services.s3.model.EncryptionMaterials;
079import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest;
080import com.amazonaws.services.s3.model.GetObjectRequest;
081import com.amazonaws.services.s3.model.ListObjectsRequest;
082import com.amazonaws.services.s3.model.ObjectListing;
083import com.amazonaws.services.s3.model.ObjectMetadata;
084import com.amazonaws.services.s3.model.PutObjectRequest;
085import com.amazonaws.services.s3.model.S3ObjectSummary;
086import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
087import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider;
088import com.amazonaws.services.s3.transfer.Download;
089import com.amazonaws.services.s3.transfer.TransferManager;
090import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
091import com.amazonaws.services.s3.transfer.Upload;
092
093/**
094 * A Binary Manager that stores binaries as S3 BLOBs
095 * <p>
096 * The BLOBs are cached locally on first access for efficiency.
097 * <p>
098 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file
099 * if accessed before the stream.
100 */
101public class S3BinaryManager extends AbstractCloudBinaryManager {
102
103    private static final String MD5 = "MD5"; // must be MD5 for Etag
104
105    @Override
106    protected String getDefaultDigestAlgorithm() {
107        return MD5;
108    }
109
110    private static final Log log = LogFactory.getLog(S3BinaryManager.class);
111
112    public static final String SYSTEM_PROPERTY_PREFIX = "nuxeo.s3storage";
113
114    public static final String BUCKET_NAME_PROPERTY = "bucket";
115
116    public static final String BUCKET_PREFIX_PROPERTY = "bucket_prefix";
117
118    public static final String BUCKET_REGION_PROPERTY = "region";
119
120    public static final String AWS_ID_PROPERTY = "awsid";
121
122    public static final String AWS_SECRET_PROPERTY = "awssecret";
123
124    /**
125     * @since 10.10
126     */
127    public static final String AWS_SESSION_TOKEN_PROPERTY = "awstoken";
128
129    /** AWS ClientConfiguration default 50 */
130    public static final String CONNECTION_MAX_PROPERTY = "connection.max";
131
132    /** AWS ClientConfiguration default 3 (with exponential backoff) */
133    public static final String CONNECTION_RETRY_PROPERTY = "connection.retry";
134
135    /** AWS ClientConfiguration default 50*1000 = 50s */
136    public static final String CONNECTION_TIMEOUT_PROPERTY = "connection.timeout";
137
138    /** AWS ClientConfiguration default 50*1000 = 50s */
139    public static final String SOCKET_TIMEOUT_PROPERTY = "socket.timeout";
140
141    public static final String KEYSTORE_FILE_PROPERTY = "crypt.keystore.file";
142
143    public static final String KEYSTORE_PASS_PROPERTY = "crypt.keystore.password";
144
145    public static final String SERVERSIDE_ENCRYPTION_PROPERTY = "crypt.serverside";
146
147    public static final String SERVERSIDE_ENCRYPTION_KMS_KEY_PROPERTY = "crypt.kms.key";
148
149    public static final String PRIVKEY_ALIAS_PROPERTY = "crypt.key.alias";
150
151    public static final String PRIVKEY_PASS_PROPERTY = "crypt.key.password";
152
153    public static final String ENDPOINT_PROPERTY = "endpoint";
154
155    /**
156     * @since 10.3
157     */
158    public static final String PATHSTYLEACCESS_PROPERTY = "pathstyleaccess";
159
160    public static final String DIRECTDOWNLOAD_PROPERTY_COMPAT = "downloadfroms3";
161
162    public static final String DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT = "downloadfroms3.expire";
163
164    public static final String DELIMITER = "/";
165
166    private static final Pattern MD5_RE = Pattern.compile("[0-9a-f]{32}");
167
168    protected String bucketName;
169
170    protected String bucketNamePrefix;
171
172    protected AWSCredentialsProvider awsCredentialsProvider;
173
174    protected ClientConfiguration clientConfiguration;
175
176    protected EncryptionMaterials encryptionMaterials;
177
178    protected boolean isEncrypted;
179
180    protected CryptoConfiguration cryptoConfiguration;
181
182    protected boolean useServerSideEncryption;
183
184    protected String serverSideKMSKeyID;
185
186    protected AmazonS3 amazonS3;
187
188    protected TransferManager transferManager;
189
190    @Override
191    public void close() {
192        // this also shuts down the AmazonS3Client
193        transferManager.shutdownNow();
194        super.close();
195    }
196
197    /**
198     * Aborts uploads that crashed and are older than 1 day.
199     *
200     * @since 7.2
201     */
202    protected void abortOldUploads() throws IOException {
203        int oneDay = 1000 * 60 * 60 * 24;
204        try {
205            transferManager.abortMultipartUploads(bucketName, new Date(System.currentTimeMillis() - oneDay));
206        } catch (AmazonS3Exception e) {
207            if (e.getStatusCode() == 400 || e.getStatusCode() == 404) {
208                log.error("Your cloud provider does not support aborting old uploads");
209                return;
210            }
211            throw new IOException("Failed to abort old uploads", e);
212        }
213    }
214
215    @Override
216    protected void setupCloudClient() throws IOException {
217        // Get settings from the configuration
218        bucketName = getProperty(BUCKET_NAME_PROPERTY);
219        // as bucket prefix is optional we don't want to use the fallback mechanism
220        bucketNamePrefix = StringUtils.defaultString(properties.get(BUCKET_PREFIX_PROPERTY));
221        String bucketRegion = getProperty(BUCKET_REGION_PROPERTY);
222        if (isBlank(bucketRegion)) {
223            bucketRegion = NuxeoAWSRegionProvider.getInstance().getRegion();
224        }
225        String awsID = getProperty(AWS_ID_PROPERTY);
226        String awsSecret = getProperty(AWS_SECRET_PROPERTY);
227        String awsToken = getProperty(AWS_SESSION_TOKEN_PROPERTY);
228
229        String proxyHost = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_HOST);
230        String proxyPort = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PORT);
231        String proxyLogin = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_LOGIN);
232        String proxyPassword = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PASSWORD);
233
234        int maxConnections = getIntProperty(CONNECTION_MAX_PROPERTY);
235        int maxErrorRetry = getIntProperty(CONNECTION_RETRY_PROPERTY);
236        int connectionTimeout = getIntProperty(CONNECTION_TIMEOUT_PROPERTY);
237        int socketTimeout = getIntProperty(SOCKET_TIMEOUT_PROPERTY);
238
239        String keystoreFile = getProperty(KEYSTORE_FILE_PROPERTY);
240        String keystorePass = getProperty(KEYSTORE_PASS_PROPERTY);
241        String privkeyAlias = getProperty(PRIVKEY_ALIAS_PROPERTY);
242        String privkeyPass = getProperty(PRIVKEY_PASS_PROPERTY);
243        String endpoint = getProperty(ENDPOINT_PROPERTY);
244        boolean pathStyleAccessEnabled = getBooleanProperty(PATHSTYLEACCESS_PROPERTY);
245        String sseprop = getProperty(SERVERSIDE_ENCRYPTION_PROPERTY);
246        if (isNotBlank(sseprop)) {
247            useServerSideEncryption = Boolean.parseBoolean(sseprop);
248            serverSideKMSKeyID = getProperty(SERVERSIDE_ENCRYPTION_KMS_KEY_PROPERTY);
249        }
250
251        if (isBlank(bucketName)) {
252            throw new RuntimeException("Missing conf: " + BUCKET_NAME_PROPERTY);
253        }
254
255        if (!isBlank(bucketNamePrefix) && !bucketNamePrefix.endsWith(DELIMITER)) {
256            log.warn(String.format("%s %s S3 bucket prefix should end with '/' : added automatically.",
257                    BUCKET_PREFIX_PROPERTY, bucketNamePrefix));
258            bucketNamePrefix += DELIMITER;
259        }
260        if (isNotBlank(namespace)) {
261            // use namespace as an additional prefix
262            bucketNamePrefix += namespace;
263            if (!bucketNamePrefix.endsWith(DELIMITER)) {
264                bucketNamePrefix += DELIMITER;
265            }
266        }
267
268        // set up credentials
269        awsCredentialsProvider = S3Utils.getAWSCredentialsProvider(awsID, awsSecret, awsToken);
270
271        // set up client configuration
272        clientConfiguration = new ClientConfiguration();
273        if (isNotBlank(proxyHost)) {
274            clientConfiguration.setProxyHost(proxyHost);
275        }
276        if (isNotBlank(proxyPort)) {
277            clientConfiguration.setProxyPort(Integer.parseInt(proxyPort));
278        }
279        if (isNotBlank(proxyLogin)) {
280            clientConfiguration.setProxyUsername(proxyLogin);
281        }
282        if (proxyPassword != null) { // could be blank
283            clientConfiguration.setProxyPassword(proxyPassword);
284        }
285        if (maxConnections > 0) {
286            clientConfiguration.setMaxConnections(maxConnections);
287        }
288        if (maxErrorRetry >= 0) { // 0 is allowed
289            clientConfiguration.setMaxErrorRetry(maxErrorRetry);
290        }
291        if (connectionTimeout >= 0) { // 0 is allowed
292            clientConfiguration.setConnectionTimeout(connectionTimeout);
293        }
294        if (socketTimeout >= 0) { // 0 is allowed
295            clientConfiguration.setSocketTimeout(socketTimeout);
296        }
297
298        // set up encryption
299        encryptionMaterials = null;
300        if (isNotBlank(keystoreFile)) {
301            boolean confok = true;
302            if (keystorePass == null) { // could be blank
303                log.error("Keystore password missing");
304                confok = false;
305            }
306            if (isBlank(privkeyAlias)) {
307                log.error("Key alias missing");
308                confok = false;
309            }
310            if (privkeyPass == null) { // could be blank
311                log.error("Key password missing");
312                confok = false;
313            }
314            if (!confok) {
315                throw new RuntimeException("S3 Crypto configuration incomplete");
316            }
317            try {
318                // Open keystore
319                File ksFile = new File(keystoreFile);
320                KeyStore keystore;
321                try (FileInputStream ksStream = new FileInputStream(ksFile)) {
322                    keystore = KeyStore.getInstance(KeyStore.getDefaultType());
323                    keystore.load(ksStream, keystorePass.toCharArray());
324                }
325                // Get keypair for alias
326                if (!keystore.isKeyEntry(privkeyAlias)) {
327                    throw new RuntimeException("Alias " + privkeyAlias + " is missing or not a key alias");
328                }
329                PrivateKey privKey = (PrivateKey) keystore.getKey(privkeyAlias, privkeyPass.toCharArray());
330                Certificate cert = keystore.getCertificate(privkeyAlias);
331                PublicKey pubKey = cert.getPublicKey();
332                KeyPair keypair = new KeyPair(pubKey, privKey);
333                // Get encryptionMaterials from keypair
334                encryptionMaterials = new EncryptionMaterials(keypair);
335                cryptoConfiguration = new CryptoConfiguration();
336            } catch (IOException | GeneralSecurityException e) {
337                throw new RuntimeException("Could not read keystore: " + keystoreFile + ", alias: " + privkeyAlias, e);
338            }
339        }
340        isEncrypted = encryptionMaterials != null;
341
342        AmazonS3Builder<?, ?> s3Builder;
343        // Try to create bucket if it doesn't exist
344        if (!isEncrypted) {
345            s3Builder = AmazonS3ClientBuilder.standard()
346                                             .withCredentials(awsCredentialsProvider)
347                                             .withClientConfiguration(clientConfiguration);
348
349        } else {
350            s3Builder = AmazonS3EncryptionClientBuilder.standard()
351                                                       .withClientConfiguration(clientConfiguration)
352                                                       .withCryptoConfiguration(cryptoConfiguration)
353                                                       .withCredentials(awsCredentialsProvider)
354                                                       .withEncryptionMaterials(new StaticEncryptionMaterialsProvider(
355                                                               encryptionMaterials));
356        }
357        if (pathStyleAccessEnabled) {
358            log.debug("Path-style access enabled");
359            s3Builder.enablePathStyleAccess();
360        }
361        if (isNotBlank(endpoint)) {
362            s3Builder = s3Builder.withEndpointConfiguration(new EndpointConfiguration(endpoint, bucketRegion));
363        } else {
364            s3Builder = s3Builder.withRegion(bucketRegion);
365        }
366
367        amazonS3 = s3Builder.build();
368
369        try {
370            if (!amazonS3.doesBucketExist(bucketName)) {
371                amazonS3.createBucket(bucketName);
372                amazonS3.setBucketAcl(bucketName, CannedAccessControlList.Private);
373            }
374        } catch (AmazonClientException e) {
375            throw new IOException(e);
376        }
377
378        // compat for NXP-17895, using "downloadfroms3", to be removed
379        // these two fields have already been initialized by the base class initialize()
380        // using standard property "directdownload"
381        String dd = getProperty(DIRECTDOWNLOAD_PROPERTY_COMPAT);
382        if (dd != null) {
383            directDownload = Boolean.parseBoolean(dd);
384        }
385        int dde = getIntProperty(DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT);
386        if (dde >= 0) {
387            directDownloadExpire = dde;
388        }
389
390        transferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build();
391        abortOldUploads();
392    }
393
394    protected void removeBinary(String digest) {
395        amazonS3.deleteObject(bucketName, bucketNamePrefix + digest);
396    }
397
398    @Override
399    protected String getSystemPropertyPrefix() {
400        return SYSTEM_PROPERTY_PREFIX;
401    }
402
403    @Override
404    protected BinaryGarbageCollector instantiateGarbageCollector() {
405        return new S3BinaryGarbageCollector(this);
406    }
407
408    @Override
409    public void removeBinaries(Collection<String> digests) {
410        digests.forEach(this::removeBinary);
411    }
412
413    protected static boolean isMissingKey(AmazonClientException e) {
414        if (e instanceof AmazonServiceException) {
415            AmazonServiceException ase = (AmazonServiceException) e;
416            return (ase.getStatusCode() == 404) || "NoSuchKey".equals(ase.getErrorCode())
417                    || "Not Found".equals(e.getMessage());
418        }
419        return false;
420    }
421
422    public static boolean isMD5(String digest) {
423        return MD5_RE.matcher(digest).matches();
424    }
425
426    /**
427     * Used in the healthCheck; the transferManager should be initialized and the bucket accessible
428     *
429     * @since 9.3
430     */
431    public boolean canAccessBucket() {
432        return transferManager != null && transferManager.getAmazonS3Client().doesBucketExist(bucketName);
433    }
434
435    @Override
436    protected FileStorage getFileStorage() {
437        return new S3FileStorage();
438    }
439
440    /**
441     * Gets the AWSCredentialsProvider.
442     *
443     * @since 10.2
444     */
445    public AWSCredentialsProvider getAwsCredentialsProvider() {
446        return awsCredentialsProvider;
447    }
448
449    /**
450     * Gets AmazonS3.
451     *
452     * @since 10.2
453     */
454    public AmazonS3 getAmazonS3() {
455        return amazonS3;
456    }
457
458    @Override
459    public String writeBlob(Blob blob) throws IOException {
460        // Attempt to do S3 Copy if the Source Blob provider is also S3
461        if (blob instanceof ManagedBlob) {
462            ManagedBlob managedBlob = (ManagedBlob) blob;
463            BlobProvider blobProvider = Framework.getService(BlobManager.class)
464                                                 .getBlobProvider(managedBlob.getProviderId());
465            if (blobProvider instanceof S3BinaryManager && blobProvider != this) {
466                // use S3 direct copy as the source blob provider is also S3
467                String key = copyBlob((S3BinaryManager) blobProvider, managedBlob.getKey());
468                if (key != null) {
469                    return key;
470                }
471            }
472        }
473        return super.writeBlob(blob);
474    }
475
476    /**
477     * Copies a blob. Returns {@code null} if the copy was not possible.
478     *
479     * @param sourceBlobProvider the source blob provider
480     * @param blobKey the source blob key
481     * @return the copied blob key, or {@code null} if the copy was not possible
482     * @throws IOException
483     * @since 10.1
484     */
485    protected String copyBlob(S3BinaryManager sourceBlobProvider, String blobKey) throws IOException {
486        String digest = blobKey;
487        int colon = digest.indexOf(':');
488        if (colon >= 0) {
489            digest = digest.substring(colon + 1);
490        }
491        String sourceBucketName = sourceBlobProvider.bucketName;
492        String sourceKey = sourceBlobProvider.bucketNamePrefix + digest;
493        String key = bucketNamePrefix + digest;
494        long t0 = 0;
495        if (log.isDebugEnabled()) {
496            t0 = System.currentTimeMillis();
497            log.debug("copying blob " + sourceKey + " to " + key);
498        }
499
500        try {
501            amazonS3.getObjectMetadata(bucketName, key);
502            if (log.isDebugEnabled()) {
503                log.debug("blob " + key + " is already in S3");
504            }
505            return digest;
506        } catch (AmazonServiceException e) {
507            if (!isMissingKey(e)) {
508                throw new IOException(e);
509            }
510            // object does not exist, just continue
511        }
512
513        // not already present -> copy the blob
514        ObjectMetadata sourceMetadata;
515        try {
516            sourceMetadata = amazonS3.getObjectMetadata(sourceBucketName, sourceKey);
517        } catch (AmazonServiceException e) {
518            throw new NuxeoException("Source blob does not exists: s3://" + sourceBucketName + "/" + sourceKey, e);
519        }
520        try {
521            if (sourceMetadata.getContentLength() > NON_MULTIPART_COPY_MAX_SIZE) {
522                S3Utils.copyFileMultipart(amazonS3, sourceMetadata, sourceBucketName, sourceKey, bucketName, key, true);
523            } else {
524                S3Utils.copyFile(amazonS3, sourceMetadata, sourceBucketName, sourceKey, bucketName, key, true);
525            }
526            if (log.isDebugEnabled()) {
527                long dtms = System.currentTimeMillis() - t0;
528                log.debug("copied blob " + sourceKey + " to " + key + " in " + dtms + "ms");
529            }
530            return digest;
531        } catch (AmazonServiceException e) {
532            log.warn("direct S3 copy not supported, please check your keys and policies", e);
533            return null;
534        }
535    }
536
537    public class S3FileStorage implements FileStorage {
538
539        @Override
540        public void storeFile(String digest, File file) throws IOException {
541            long t0 = 0;
542            if (log.isDebugEnabled()) {
543                t0 = System.currentTimeMillis();
544                log.debug("storing blob " + digest + " to S3");
545            }
546            String key = bucketNamePrefix + digest;
547            try {
548                amazonS3.getObjectMetadata(bucketName, key);
549                if (log.isDebugEnabled()) {
550                    log.debug("blob " + digest + " is already in S3");
551                }
552            } catch (AmazonClientException e) {
553                if (!isMissingKey(e)) {
554                    throw new IOException(e);
555                }
556                // not already present -> store the blob
557                PutObjectRequest request;
558                if (!isEncrypted) {
559                    request = new PutObjectRequest(bucketName, key, file);
560                    if (useServerSideEncryption) {
561                        ObjectMetadata objectMetadata = new ObjectMetadata();
562                        if (isNotBlank(serverSideKMSKeyID)) {
563                            SSEAwsKeyManagementParams keyManagementParams = new SSEAwsKeyManagementParams(
564                                    serverSideKMSKeyID);
565                            request = request.withSSEAwsKeyManagementParams(keyManagementParams);
566                        } else {
567                            objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
568                        }
569                        request.setMetadata(objectMetadata);
570                    }
571                } else {
572                    request = new EncryptedPutObjectRequest(bucketName, key, file);
573                }
574                Upload upload = transferManager.upload(request);
575                try {
576                    upload.waitForUploadResult();
577                } catch (AmazonClientException ee) {
578                    throw new IOException(ee);
579                } catch (InterruptedException ee) {
580                    Thread.currentThread().interrupt();
581                    throw new RuntimeException(ee);
582                } finally {
583                    if (log.isDebugEnabled()) {
584                        long dtms = System.currentTimeMillis() - t0;
585                        log.debug("stored blob " + digest + " to S3 in " + dtms + "ms");
586                    }
587                }
588            }
589        }
590
591        @Override
592        public boolean fetchFile(String digest, File file) throws IOException {
593            long t0 = 0;
594            if (log.isDebugEnabled()) {
595                t0 = System.currentTimeMillis();
596                log.debug("fetching blob " + digest + " from S3");
597            }
598            try {
599                Download download = transferManager.download(
600                        new GetObjectRequest(bucketName, bucketNamePrefix + digest), file);
601                download.waitForCompletion();
602                // Check ETag it is by default MD5 if not multipart
603                if (!isEncrypted && !digest.equals(download.getObjectMetadata().getETag())) {
604                    // In case of multipart it will happen, verify the downloaded file
605                    String currentDigest;
606                    try (FileInputStream input = new FileInputStream(file)) {
607                        currentDigest = DigestUtils.md5Hex(input);
608                    }
609                    if (!currentDigest.equals(digest)) {
610                        log.error("Invalid ETag in S3, currentDigest=" + currentDigest + " expectedDigest=" + digest);
611                        throw new IOException("Invalid S3 object, it is corrupted expected digest is " + digest
612                                + " got " + currentDigest);
613                    }
614                }
615                return true;
616            } catch (AmazonClientException e) {
617                if (!isMissingKey(e)) {
618                    throw new IOException(e);
619                }
620                return false;
621            } catch (InterruptedException e) {
622                Thread.currentThread().interrupt();
623                throw new RuntimeException(e);
624            } finally {
625                if (log.isDebugEnabled()) {
626                    long dtms = System.currentTimeMillis() - t0;
627                    log.debug("fetched blob " + digest + " from S3 in " + dtms + "ms");
628                }
629            }
630
631        }
632    }
633
634    /**
635     * Garbage collector for S3 binaries that stores the marked (in use) binaries in memory.
636     */
637    public static class S3BinaryGarbageCollector extends AbstractBinaryGarbageCollector<S3BinaryManager> {
638
639        protected S3BinaryGarbageCollector(S3BinaryManager binaryManager) {
640            super(binaryManager);
641        }
642
643        @Override
644        public String getId() {
645            return "s3:" + binaryManager.bucketName;
646        }
647
648        @Override
649        public Set<String> getUnmarkedBlobs() {
650            // list S3 objects in the bucket
651            // record those not marked
652            Set<String> unmarked = new HashSet<>();
653            ObjectListing list = null;
654            do {
655                if (list == null) {
656                    // use delimiter to avoid useless listing of objects in "subdirectories"
657                    ListObjectsRequest listObjectsRequest = new ListObjectsRequest(binaryManager.bucketName,
658                            binaryManager.bucketNamePrefix, null, DELIMITER, null);
659                    list = binaryManager.amazonS3.listObjects(listObjectsRequest);
660                } else {
661                    list = binaryManager.amazonS3.listNextBatchOfObjects(list);
662                }
663                int prefixLength = binaryManager.bucketNamePrefix.length();
664                for (S3ObjectSummary summary : list.getObjectSummaries()) {
665                    String digest = summary.getKey().substring(prefixLength);
666                    if (!isMD5(digest)) {
667                        // ignore files that cannot be MD5 digests for
668                        // safety
669                        continue;
670                    }
671                    long length = summary.getSize();
672                    if (marked.contains(digest)) {
673                        status.numBinaries++;
674                        status.sizeBinaries += length;
675                    } else {
676                        status.numBinariesGC++;
677                        status.sizeBinariesGC += length;
678                        // record file to delete
679                        unmarked.add(digest);
680                        marked.remove(digest); // optimize memory
681                    }
682                }
683            } while (list.isTruncated());
684
685            return unmarked;
686        }
687    }
688
689    // ******************** BlobProvider ********************
690
691    @Override
692    protected URI getRemoteUri(String digest, ManagedBlob blob, HttpServletRequest servletRequest) throws IOException {
693        String key = bucketNamePrefix + digest;
694        Date expiration = new Date();
695        expiration.setTime(expiration.getTime() + directDownloadExpire * 1000);
696        GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucketName, key, HttpMethod.GET);
697        request.addRequestParameter("response-content-type", getContentTypeHeader(blob));
698        request.addRequestParameter("response-content-disposition", getContentDispositionHeader(blob, null));
699        request.setExpiration(expiration);
700        URL url = amazonS3.generatePresignedUrl(request);
701        try {
702            return url.toURI();
703        } catch (URISyntaxException e) {
704            throw new IOException(e);
705        }
706    }
707
708}