001/*
002 * (C) Copyright 2011-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Mathieu Guillaume
018 *     Florent Guillaume
019 *     Luís Duarte
020 */
021package org.nuxeo.ecm.core.storage.sql;
022
023import static org.apache.commons.lang3.StringUtils.isBlank;
024import static org.apache.commons.lang3.StringUtils.isNotBlank;
025import static org.nuxeo.ecm.core.storage.sql.S3Utils.NON_MULTIPART_COPY_MAX_SIZE;
026
027import java.io.File;
028import java.io.FileInputStream;
029import java.io.IOException;
030import java.net.URI;
031import java.net.URISyntaxException;
032import java.net.URL;
033import java.security.GeneralSecurityException;
034import java.security.KeyPair;
035import java.security.KeyStore;
036import java.security.PrivateKey;
037import java.security.PublicKey;
038import java.security.cert.Certificate;
039import java.util.Collection;
040import java.util.Date;
041import java.util.HashSet;
042import java.util.Set;
043import java.util.regex.Pattern;
044
045import javax.servlet.http.HttpServletRequest;
046
047import org.apache.commons.codec.digest.DigestUtils;
048import org.apache.commons.lang3.StringUtils;
049import org.apache.commons.logging.Log;
050import org.apache.commons.logging.LogFactory;
051import org.nuxeo.common.Environment;
052import org.nuxeo.ecm.blob.AbstractBinaryGarbageCollector;
053import org.nuxeo.ecm.blob.AbstractCloudBinaryManager;
054import org.nuxeo.ecm.core.api.Blob;
055import org.nuxeo.ecm.core.api.NuxeoException;
056import org.nuxeo.ecm.core.blob.BlobManager;
057import org.nuxeo.ecm.core.blob.BlobProvider;
058import org.nuxeo.ecm.core.blob.ManagedBlob;
059import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
060import org.nuxeo.ecm.core.blob.binary.FileStorage;
061import org.nuxeo.runtime.api.Framework;
062
063import com.amazonaws.AmazonClientException;
064import com.amazonaws.AmazonServiceException;
065import com.amazonaws.ClientConfiguration;
066import com.amazonaws.HttpMethod;
067import com.amazonaws.auth.AWSCredentialsProvider;
068import com.amazonaws.client.builder.AwsClientBuilder;
069import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
070import com.amazonaws.services.s3.AmazonS3;
071import com.amazonaws.services.s3.AmazonS3ClientBuilder;
072import com.amazonaws.services.s3.AmazonS3EncryptionClientBuilder;
073import com.amazonaws.services.s3.model.AmazonS3Exception;
074import com.amazonaws.services.s3.model.CannedAccessControlList;
075import com.amazonaws.services.s3.model.CryptoConfiguration;
076import com.amazonaws.services.s3.model.EncryptedPutObjectRequest;
077import com.amazonaws.services.s3.model.EncryptionMaterials;
078import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest;
079import com.amazonaws.services.s3.model.GetObjectRequest;
080import com.amazonaws.services.s3.model.ObjectListing;
081import com.amazonaws.services.s3.model.ObjectMetadata;
082import com.amazonaws.services.s3.model.PutObjectRequest;
083import com.amazonaws.services.s3.model.S3ObjectSummary;
084import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
085import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider;
086import com.amazonaws.services.s3.transfer.Download;
087import com.amazonaws.services.s3.transfer.TransferManager;
088import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
089import com.amazonaws.services.s3.transfer.Upload;
090import com.google.common.base.MoreObjects;
091
092/**
093 * A Binary Manager that stores binaries as S3 BLOBs
094 * <p>
095 * The BLOBs are cached locally on first access for efficiency.
096 * <p>
097 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file
098 * if accessed before the stream.
099 */
100public class S3BinaryManager extends AbstractCloudBinaryManager {
101
102    private static final String MD5 = "MD5"; // must be MD5 for Etag
103
104    @Override
105    protected String getDefaultDigestAlgorithm() {
106        return MD5;
107    }
108
109    private static final Log log = LogFactory.getLog(S3BinaryManager.class);
110
111    public static final String SYSTEM_PROPERTY_PREFIX = "nuxeo.s3storage";
112
113    public static final String BUCKET_NAME_PROPERTY = "bucket";
114
115    public static final String BUCKET_PREFIX_PROPERTY = "bucket_prefix";
116
117    public static final String BUCKET_REGION_PROPERTY = "region";
118
119    public static final String DEFAULT_BUCKET_REGION = "us-east-1"; // US East
120
121    public static final String AWS_ID_PROPERTY = "awsid";
122
123    public static final String AWS_ID_ENV = "AWS_ACCESS_KEY_ID";
124
125    public static final String AWS_SECRET_PROPERTY = "awssecret";
126
127    public static final String AWS_SECRET_ENV = "AWS_SECRET_ACCESS_KEY";
128
129    /** AWS ClientConfiguration default 50 */
130    public static final String CONNECTION_MAX_PROPERTY = "connection.max";
131
132    /** AWS ClientConfiguration default 3 (with exponential backoff) */
133    public static final String CONNECTION_RETRY_PROPERTY = "connection.retry";
134
135    /** AWS ClientConfiguration default 50*1000 = 50s */
136    public static final String CONNECTION_TIMEOUT_PROPERTY = "connection.timeout";
137
138    /** AWS ClientConfiguration default 50*1000 = 50s */
139    public static final String SOCKET_TIMEOUT_PROPERTY = "socket.timeout";
140
141    public static final String KEYSTORE_FILE_PROPERTY = "crypt.keystore.file";
142
143    public static final String KEYSTORE_PASS_PROPERTY = "crypt.keystore.password";
144
145    public static final String SERVERSIDE_ENCRYPTION_PROPERTY = "crypt.serverside";
146
147    public static final String SERVERSIDE_ENCRYPTION_KMS_KEY_PROPERTY = "crypt.kms.key";
148
149    public static final String PRIVKEY_ALIAS_PROPERTY = "crypt.key.alias";
150
151    public static final String PRIVKEY_PASS_PROPERTY = "crypt.key.password";
152
153    public static final String ENDPOINT_PROPERTY = "endpoint";
154
155    public static final String DIRECTDOWNLOAD_PROPERTY_COMPAT = "downloadfroms3";
156
157    public static final String DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT = "downloadfroms3.expire";
158
159    private static final Pattern MD5_RE = Pattern.compile("[0-9a-f]{32}");
160
161    protected String bucketName;
162
163    protected String bucketNamePrefix;
164
165    protected AWSCredentialsProvider awsCredentialsProvider;
166
167    protected ClientConfiguration clientConfiguration;
168
169    protected EncryptionMaterials encryptionMaterials;
170
171    protected boolean isEncrypted;
172
173    protected CryptoConfiguration cryptoConfiguration;
174
175    protected boolean useServerSideEncryption;
176
177    protected String serverSideKMSKeyID;
178
179    protected AmazonS3 amazonS3;
180
181    protected TransferManager transferManager;
182
183    @Override
184    public void close() {
185        // this also shuts down the AmazonS3Client
186        transferManager.shutdownNow();
187        super.close();
188    }
189
190    /**
191     * Aborts uploads that crashed and are older than 1 day.
192     *
193     * @since 7.2
194     */
195    protected void abortOldUploads() throws IOException {
196        int oneDay = 1000 * 60 * 60 * 24;
197        try {
198            transferManager.abortMultipartUploads(bucketName, new Date(System.currentTimeMillis() - oneDay));
199        } catch (AmazonS3Exception e) {
200            if (e.getStatusCode() == 400 || e.getStatusCode() == 404) {
201                log.error("Your cloud provider does not support aborting old uploads");
202                return;
203            }
204            throw new IOException("Failed to abort old uploads", e);
205        }
206    }
207
208    @Override
209    protected void setupCloudClient() throws IOException {
210        // Get settings from the configuration
211        bucketName = getProperty(BUCKET_NAME_PROPERTY);
212        bucketNamePrefix = MoreObjects.firstNonNull(getProperty(BUCKET_PREFIX_PROPERTY), StringUtils.EMPTY);
213        String bucketRegion = getProperty(BUCKET_REGION_PROPERTY);
214        if (isBlank(bucketRegion)) {
215            bucketRegion = DEFAULT_BUCKET_REGION;
216        }
217        String awsID = getProperty(AWS_ID_PROPERTY);
218        String awsSecret = getProperty(AWS_SECRET_PROPERTY);
219
220        String proxyHost = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_HOST);
221        String proxyPort = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PORT);
222        String proxyLogin = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_LOGIN);
223        String proxyPassword = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PASSWORD);
224
225        int maxConnections = getIntProperty(CONNECTION_MAX_PROPERTY);
226        int maxErrorRetry = getIntProperty(CONNECTION_RETRY_PROPERTY);
227        int connectionTimeout = getIntProperty(CONNECTION_TIMEOUT_PROPERTY);
228        int socketTimeout = getIntProperty(SOCKET_TIMEOUT_PROPERTY);
229
230        String keystoreFile = getProperty(KEYSTORE_FILE_PROPERTY);
231        String keystorePass = getProperty(KEYSTORE_PASS_PROPERTY);
232        String privkeyAlias = getProperty(PRIVKEY_ALIAS_PROPERTY);
233        String privkeyPass = getProperty(PRIVKEY_PASS_PROPERTY);
234        String endpoint = getProperty(ENDPOINT_PROPERTY);
235        String sseprop = getProperty(SERVERSIDE_ENCRYPTION_PROPERTY);
236        if (isNotBlank(sseprop)) {
237            useServerSideEncryption = Boolean.parseBoolean(sseprop);
238            serverSideKMSKeyID = getProperty(SERVERSIDE_ENCRYPTION_KMS_KEY_PROPERTY);
239        }
240
241        // Fallback on default env keys for ID and secret
242        if (isBlank(awsID)) {
243            awsID = System.getenv(AWS_ID_ENV);
244        }
245        if (isBlank(awsSecret)) {
246            awsSecret = System.getenv(AWS_SECRET_ENV);
247        }
248
249        if (isBlank(bucketName)) {
250            throw new RuntimeException("Missing conf: " + BUCKET_NAME_PROPERTY);
251        }
252
253        if (!isBlank(bucketNamePrefix) && !bucketNamePrefix.endsWith("/")) {
254            log.warn(String.format("%s %s S3 bucket prefix should end with '/' : added automatically.",
255                    BUCKET_PREFIX_PROPERTY, bucketNamePrefix));
256            bucketNamePrefix += "/";
257        }
258        // set up credentials
259        awsCredentialsProvider = S3Utils.getAWSCredentialsProvider(awsID, awsSecret);
260
261        // set up client configuration
262        clientConfiguration = new ClientConfiguration();
263        if (isNotBlank(proxyHost)) {
264            clientConfiguration.setProxyHost(proxyHost);
265        }
266        if (isNotBlank(proxyPort)) {
267            clientConfiguration.setProxyPort(Integer.parseInt(proxyPort));
268        }
269        if (isNotBlank(proxyLogin)) {
270            clientConfiguration.setProxyUsername(proxyLogin);
271        }
272        if (proxyPassword != null) { // could be blank
273            clientConfiguration.setProxyPassword(proxyPassword);
274        }
275        if (maxConnections > 0) {
276            clientConfiguration.setMaxConnections(maxConnections);
277        }
278        if (maxErrorRetry >= 0) { // 0 is allowed
279            clientConfiguration.setMaxErrorRetry(maxErrorRetry);
280        }
281        if (connectionTimeout >= 0) { // 0 is allowed
282            clientConfiguration.setConnectionTimeout(connectionTimeout);
283        }
284        if (socketTimeout >= 0) { // 0 is allowed
285            clientConfiguration.setSocketTimeout(socketTimeout);
286        }
287
288        // set up encryption
289        encryptionMaterials = null;
290        if (isNotBlank(keystoreFile)) {
291            boolean confok = true;
292            if (keystorePass == null) { // could be blank
293                log.error("Keystore password missing");
294                confok = false;
295            }
296            if (isBlank(privkeyAlias)) {
297                log.error("Key alias missing");
298                confok = false;
299            }
300            if (privkeyPass == null) { // could be blank
301                log.error("Key password missing");
302                confok = false;
303            }
304            if (!confok) {
305                throw new RuntimeException("S3 Crypto configuration incomplete");
306            }
307            try {
308                // Open keystore
309                File ksFile = new File(keystoreFile);
310                KeyStore keystore;
311                try (FileInputStream ksStream = new FileInputStream(ksFile)) {
312                    keystore = KeyStore.getInstance(KeyStore.getDefaultType());
313                    keystore.load(ksStream, keystorePass.toCharArray());
314                }
315                // Get keypair for alias
316                if (!keystore.isKeyEntry(privkeyAlias)) {
317                    throw new RuntimeException("Alias " + privkeyAlias + " is missing or not a key alias");
318                }
319                PrivateKey privKey = (PrivateKey) keystore.getKey(privkeyAlias, privkeyPass.toCharArray());
320                Certificate cert = keystore.getCertificate(privkeyAlias);
321                PublicKey pubKey = cert.getPublicKey();
322                KeyPair keypair = new KeyPair(pubKey, privKey);
323                // Get encryptionMaterials from keypair
324                encryptionMaterials = new EncryptionMaterials(keypair);
325                cryptoConfiguration = new CryptoConfiguration();
326            } catch (IOException | GeneralSecurityException e) {
327                throw new RuntimeException("Could not read keystore: " + keystoreFile + ", alias: " + privkeyAlias, e);
328            }
329        }
330        isEncrypted = encryptionMaterials != null;
331
332        @SuppressWarnings("rawtypes")
333        AwsClientBuilder s3Builder;
334        // Try to create bucket if it doesn't exist
335        if (!isEncrypted) {
336            s3Builder = AmazonS3ClientBuilder.standard()
337                                             .withCredentials(awsCredentialsProvider)
338                                             .withClientConfiguration(clientConfiguration);
339
340        } else {
341            s3Builder = AmazonS3EncryptionClientBuilder.standard()
342                                                       .withClientConfiguration(clientConfiguration)
343                                                       .withCryptoConfiguration(cryptoConfiguration)
344                                                       .withCredentials(awsCredentialsProvider)
345                                                       .withEncryptionMaterials(new StaticEncryptionMaterialsProvider(
346                                                               encryptionMaterials));
347        }
348        if (isNotBlank(endpoint)) {
349            s3Builder = s3Builder.withEndpointConfiguration(new EndpointConfiguration(endpoint, bucketRegion));
350        } else {
351            s3Builder = s3Builder.withRegion(bucketRegion);
352        }
353
354        amazonS3 = (AmazonS3) s3Builder.build();
355
356        try {
357            if (!amazonS3.doesBucketExist(bucketName)) {
358                amazonS3.createBucket(bucketName);
359                amazonS3.setBucketAcl(bucketName, CannedAccessControlList.Private);
360            }
361        } catch (AmazonClientException e) {
362            throw new IOException(e);
363        }
364
365        // compat for NXP-17895, using "downloadfroms3", to be removed
366        // these two fields have already been initialized by the base class initialize()
367        // using standard property "directdownload"
368        String dd = getProperty(DIRECTDOWNLOAD_PROPERTY_COMPAT);
369        if (dd != null) {
370            directDownload = Boolean.parseBoolean(dd);
371        }
372        int dde = getIntProperty(DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT);
373        if (dde >= 0) {
374            directDownloadExpire = dde;
375        }
376
377        transferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build();
378        abortOldUploads();
379    }
380
381    protected void removeBinary(String digest) {
382        amazonS3.deleteObject(bucketName, bucketNamePrefix + digest);
383    }
384
385    @Override
386    protected String getSystemPropertyPrefix() {
387        return SYSTEM_PROPERTY_PREFIX;
388    }
389
390    @Override
391    protected BinaryGarbageCollector instantiateGarbageCollector() {
392        return new S3BinaryGarbageCollector(this);
393    }
394
395    @Override
396    public void removeBinaries(Collection<String> digests) {
397        digests.forEach(this::removeBinary);
398    }
399
400    protected static boolean isMissingKey(AmazonClientException e) {
401        if (e instanceof AmazonServiceException) {
402            AmazonServiceException ase = (AmazonServiceException) e;
403            return (ase.getStatusCode() == 404) || "NoSuchKey".equals(ase.getErrorCode())
404                    || "Not Found".equals(e.getMessage());
405        }
406        return false;
407    }
408
409    public static boolean isMD5(String digest) {
410        return MD5_RE.matcher(digest).matches();
411    }
412
413    /**
414     * Used in the healthCheck; the transferManager should be initialized and the bucket accessible
415     *
416     * @since 9.3
417     */
418    public boolean canAccessBucket() {
419        return transferManager != null && transferManager.getAmazonS3Client().doesBucketExist(bucketName);
420    }
421
422    @Override
423    protected FileStorage getFileStorage() {
424        return new S3FileStorage();
425    }
426
427    /**
428     * Gets the AWSCredentialsProvider.
429     * @since 10.2
430     */
431    public AWSCredentialsProvider getAwsCredentialsProvider() {
432        return awsCredentialsProvider;
433    }
434
435    /**
436     * Gets AmazonS3.
437     * @since 10.2
438     */
439    public AmazonS3 getAmazonS3() {
440        return amazonS3;
441    }
442
443    @Override
444    public String writeBlob(Blob blob) throws IOException {
445        // Attempt to do S3 Copy if the Source Blob provider is also S3
446        if (blob instanceof ManagedBlob) {
447            ManagedBlob managedBlob = (ManagedBlob) blob;
448            BlobProvider blobProvider = Framework.getService(BlobManager.class)
449                                                 .getBlobProvider(managedBlob.getProviderId());
450            if (blobProvider instanceof S3BinaryManager && blobProvider != this) {
451                // use S3 direct copy as the source blob provider is also S3
452                String key = copyBlob((S3BinaryManager) blobProvider, managedBlob.getKey());
453                if (key != null) {
454                    return key;
455                }
456            }
457        }
458        return super.writeBlob(blob);
459    }
460
461    /**
462     * Copies a blob. Returns {@code null} if the copy was not possible.
463     *
464     * @param sourceBlobProvider the source blob provider
465     * @param blobKey the source blob key
466     * @return the copied blob key, or {@code null} if the copy was not possible
467     * @throws IOException
468     * @since 10.1
469     */
470    protected String copyBlob(S3BinaryManager sourceBlobProvider, String blobKey) throws IOException {
471        String digest = blobKey;
472        int colon = digest.indexOf(':');
473        if (colon >= 0) {
474            digest = digest.substring(colon + 1);
475        }
476        String sourceBucketName = sourceBlobProvider.bucketName;
477        String sourceKey = sourceBlobProvider.bucketNamePrefix + digest;
478        String key = bucketNamePrefix + digest;
479        long t0 = 0;
480        if (log.isDebugEnabled()) {
481            t0 = System.currentTimeMillis();
482            log.debug("copying blob " + sourceKey + " to " + key);
483        }
484
485        try {
486            amazonS3.getObjectMetadata(bucketName, key);
487            if (log.isDebugEnabled()) {
488                log.debug("blob " + key + " is already in S3");
489            }
490            return digest;
491        } catch (AmazonServiceException e) {
492            if (!isMissingKey(e)) {
493                throw new IOException(e);
494            }
495            // object does not exist, just continue
496        }
497
498        // not already present -> copy the blob
499        ObjectMetadata sourceMetadata;
500        try {
501            sourceMetadata = amazonS3.getObjectMetadata(sourceBucketName, sourceKey);
502        } catch (AmazonServiceException e) {
503            throw new NuxeoException("Source blob does not exists: s3://" + sourceBucketName + "/" + sourceKey, e);
504        }
505        try {
506            if (sourceMetadata.getContentLength() > NON_MULTIPART_COPY_MAX_SIZE) {
507                S3Utils.copyFileMultipart(amazonS3, sourceMetadata, sourceBucketName, sourceKey, bucketName, key, true);
508            } else {
509                S3Utils.copyFile(amazonS3, sourceMetadata, sourceBucketName, sourceKey, bucketName, key, true);
510            }
511            if (log.isDebugEnabled()) {
512                long dtms = System.currentTimeMillis() - t0;
513                log.debug("copied blob " + sourceKey + " to " + key + " in " + dtms + "ms");
514            }
515            return digest;
516        } catch (AmazonServiceException e) {
517            log.warn("direct S3 copy not supported, please check your keys and policies", e);
518            return null;
519        }
520    }
521
522    public class S3FileStorage implements FileStorage {
523
524        @Override
525        public void storeFile(String digest, File file) throws IOException {
526            long t0 = 0;
527            if (log.isDebugEnabled()) {
528                t0 = System.currentTimeMillis();
529                log.debug("storing blob " + digest + " to S3");
530            }
531            String key = bucketNamePrefix + digest;
532            try {
533                amazonS3.getObjectMetadata(bucketName, key);
534                if (log.isDebugEnabled()) {
535                    log.debug("blob " + digest + " is already in S3");
536                }
537            } catch (AmazonClientException e) {
538                if (!isMissingKey(e)) {
539                    throw new IOException(e);
540                }
541                // not already present -> store the blob
542                PutObjectRequest request;
543                if (!isEncrypted) {
544                    request = new PutObjectRequest(bucketName, key, file);
545                    if (useServerSideEncryption) {
546                        ObjectMetadata objectMetadata = new ObjectMetadata();
547                        if (isNotBlank(serverSideKMSKeyID)) {
548                            SSEAwsKeyManagementParams keyManagementParams =
549                                new SSEAwsKeyManagementParams(serverSideKMSKeyID);
550                            request = request.withSSEAwsKeyManagementParams(keyManagementParams);
551                        } else {
552                            objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
553                        }
554                        request.setMetadata(objectMetadata);
555                    }
556                } else {
557                    request = new EncryptedPutObjectRequest(bucketName, key, file);
558                }
559                Upload upload = transferManager.upload(request);
560                try {
561                    upload.waitForUploadResult();
562                } catch (AmazonClientException ee) {
563                    throw new IOException(ee);
564                } catch (InterruptedException ee) {
565                    Thread.currentThread().interrupt();
566                    throw new RuntimeException(ee);
567                } finally {
568                    if (log.isDebugEnabled()) {
569                        long dtms = System.currentTimeMillis() - t0;
570                        log.debug("stored blob " + digest + " to S3 in " + dtms + "ms");
571                    }
572                }
573            }
574        }
575
576        @Override
577        public boolean fetchFile(String digest, File file) throws IOException {
578            long t0 = 0;
579            if (log.isDebugEnabled()) {
580                t0 = System.currentTimeMillis();
581                log.debug("fetching blob " + digest + " from S3");
582            }
583            try {
584                Download download = transferManager.download(
585                        new GetObjectRequest(bucketName, bucketNamePrefix + digest), file);
586                download.waitForCompletion();
587                // Check ETag it is by default MD5 if not multipart
588                if (!isEncrypted && !digest.equals(download.getObjectMetadata().getETag())) {
589                    // In case of multipart it will happen, verify the downloaded file
590                    String currentDigest;
591                    try (FileInputStream input = new FileInputStream(file)) {
592                        currentDigest = DigestUtils.md5Hex(input);
593                    }
594                    if (!currentDigest.equals(digest)) {
595                        log.error("Invalid ETag in S3, currentDigest=" + currentDigest + " expectedDigest=" + digest);
596                        throw new IOException("Invalid S3 object, it is corrupted expected digest is " + digest
597                                + " got " + currentDigest);
598                    }
599                }
600                return true;
601            } catch (AmazonClientException e) {
602                if (!isMissingKey(e)) {
603                    throw new IOException(e);
604                }
605                return false;
606            } catch (InterruptedException e) {
607                Thread.currentThread().interrupt();
608                throw new RuntimeException(e);
609            } finally {
610                if (log.isDebugEnabled()) {
611                    long dtms = System.currentTimeMillis() - t0;
612                    log.debug("fetched blob " + digest + " from S3 in " + dtms + "ms");
613                }
614            }
615
616        }
617    }
618
619    /**
620     * Garbage collector for S3 binaries that stores the marked (in use) binaries in memory.
621     */
622    public static class S3BinaryGarbageCollector extends AbstractBinaryGarbageCollector<S3BinaryManager> {
623
624        protected S3BinaryGarbageCollector(S3BinaryManager binaryManager) {
625            super(binaryManager);
626        }
627
628        @Override
629        public String getId() {
630            return "s3:" + binaryManager.bucketName;
631        }
632
633        @Override
634        public Set<String> getUnmarkedBlobs() {
635            // list S3 objects in the bucket
636            // record those not marked
637            Set<String> unmarked = new HashSet<>();
638            ObjectListing list = null;
639            do {
640                if (list == null) {
641                    list = binaryManager.amazonS3.listObjects(binaryManager.bucketName, binaryManager.bucketNamePrefix);
642                } else {
643                    list = binaryManager.amazonS3.listNextBatchOfObjects(list);
644                }
645                int prefixLength = binaryManager.bucketNamePrefix.length();
646                for (S3ObjectSummary summary : list.getObjectSummaries()) {
647                    String digest = summary.getKey().substring(prefixLength);
648                    if (!isMD5(digest)) {
649                        // ignore files that cannot be MD5 digests for
650                        // safety
651                        continue;
652                    }
653                    long length = summary.getSize();
654                    if (marked.contains(digest)) {
655                        status.numBinaries++;
656                        status.sizeBinaries += length;
657                    } else {
658                        status.numBinariesGC++;
659                        status.sizeBinariesGC += length;
660                        // record file to delete
661                        unmarked.add(digest);
662                        marked.remove(digest); // optimize memory
663                    }
664                }
665            } while (list.isTruncated());
666
667            return unmarked;
668        }
669    }
670
671    // ******************** BlobProvider ********************
672
673    @Override
674    protected URI getRemoteUri(String digest, ManagedBlob blob, HttpServletRequest servletRequest) throws IOException {
675        String key = bucketNamePrefix + digest;
676        Date expiration = new Date();
677        expiration.setTime(expiration.getTime() + directDownloadExpire * 1000);
678        GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucketName, key, HttpMethod.GET);
679        request.addRequestParameter("response-content-type", getContentTypeHeader(blob));
680        request.addRequestParameter("response-content-disposition", getContentDispositionHeader(blob, null));
681        request.setExpiration(expiration);
682        URL url = amazonS3.generatePresignedUrl(request);
683        try {
684            return url.toURI();
685        } catch (URISyntaxException e) {
686            throw new IOException(e);
687        }
688    }
689
690}