001/* 002 * (C) Copyright 2011-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Mathieu Guillaume 018 * Florent Guillaume 019 * Luís Duarte 020 */ 021package org.nuxeo.ecm.core.storage.sql; 022 023import static org.apache.commons.lang3.StringUtils.isBlank; 024import static org.apache.commons.lang3.StringUtils.isNotBlank; 025import static org.nuxeo.ecm.core.storage.sql.S3Utils.NON_MULTIPART_COPY_MAX_SIZE; 026 027import java.io.File; 028import java.io.FileInputStream; 029import java.io.IOException; 030import java.net.URI; 031import java.net.URISyntaxException; 032import java.net.URL; 033import java.security.GeneralSecurityException; 034import java.security.KeyPair; 035import java.security.KeyStore; 036import java.security.PrivateKey; 037import java.security.PublicKey; 038import java.security.cert.Certificate; 039import java.util.Collection; 040import java.util.Date; 041import java.util.HashSet; 042import java.util.Set; 043import java.util.regex.Pattern; 044 045import javax.servlet.http.HttpServletRequest; 046 047import org.apache.commons.codec.digest.DigestUtils; 048import org.apache.commons.lang3.StringUtils; 049import org.apache.commons.logging.Log; 050import org.apache.commons.logging.LogFactory; 051import org.nuxeo.common.Environment; 052import org.nuxeo.ecm.blob.AbstractBinaryGarbageCollector; 053import org.nuxeo.ecm.blob.AbstractCloudBinaryManager; 054import org.nuxeo.ecm.core.api.Blob; 055import org.nuxeo.ecm.core.api.NuxeoException; 056import org.nuxeo.ecm.core.blob.BlobManager; 057import org.nuxeo.ecm.core.blob.BlobProvider; 058import org.nuxeo.ecm.core.blob.ManagedBlob; 059import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 060import org.nuxeo.ecm.core.blob.binary.FileStorage; 061import org.nuxeo.runtime.api.Framework; 062import org.nuxeo.runtime.aws.NuxeoAWSRegionProvider; 063 064import com.amazonaws.AmazonClientException; 065import com.amazonaws.AmazonServiceException; 066import com.amazonaws.ClientConfiguration; 067import com.amazonaws.HttpMethod; 068import com.amazonaws.auth.AWSCredentialsProvider; 069import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; 070import com.amazonaws.services.s3.AmazonS3; 071import com.amazonaws.services.s3.AmazonS3Builder; 072import com.amazonaws.services.s3.AmazonS3ClientBuilder; 073import com.amazonaws.services.s3.AmazonS3EncryptionClientBuilder; 074import com.amazonaws.services.s3.model.AmazonS3Exception; 075import com.amazonaws.services.s3.model.CannedAccessControlList; 076import com.amazonaws.services.s3.model.CryptoConfiguration; 077import com.amazonaws.services.s3.model.EncryptedPutObjectRequest; 078import com.amazonaws.services.s3.model.EncryptionMaterials; 079import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest; 080import com.amazonaws.services.s3.model.GetObjectRequest; 081import com.amazonaws.services.s3.model.ListObjectsRequest; 082import com.amazonaws.services.s3.model.ObjectListing; 083import com.amazonaws.services.s3.model.ObjectMetadata; 084import com.amazonaws.services.s3.model.PutObjectRequest; 085import com.amazonaws.services.s3.model.S3ObjectSummary; 086import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams; 087import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider; 088import com.amazonaws.services.s3.transfer.Download; 089import com.amazonaws.services.s3.transfer.TransferManager; 090import com.amazonaws.services.s3.transfer.TransferManagerBuilder; 091import com.amazonaws.services.s3.transfer.Upload; 092 093/** 094 * A Binary Manager that stores binaries as S3 BLOBs 095 * <p> 096 * The BLOBs are cached locally on first access for efficiency. 097 * <p> 098 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file 099 * if accessed before the stream. 100 */ 101public class S3BinaryManager extends AbstractCloudBinaryManager { 102 103 private static final String MD5 = "MD5"; // must be MD5 for Etag 104 105 @Override 106 protected String getDefaultDigestAlgorithm() { 107 return MD5; 108 } 109 110 private static final Log log = LogFactory.getLog(S3BinaryManager.class); 111 112 public static final String SYSTEM_PROPERTY_PREFIX = "nuxeo.s3storage"; 113 114 public static final String BUCKET_NAME_PROPERTY = "bucket"; 115 116 public static final String BUCKET_PREFIX_PROPERTY = "bucket_prefix"; 117 118 public static final String BUCKET_REGION_PROPERTY = "region"; 119 120 public static final String AWS_ID_PROPERTY = "awsid"; 121 122 public static final String AWS_SECRET_PROPERTY = "awssecret"; 123 124 /** 125 * @since 10.10 126 */ 127 public static final String AWS_SESSION_TOKEN_PROPERTY = "awstoken"; 128 129 /** AWS ClientConfiguration default 50 */ 130 public static final String CONNECTION_MAX_PROPERTY = "connection.max"; 131 132 /** AWS ClientConfiguration default 3 (with exponential backoff) */ 133 public static final String CONNECTION_RETRY_PROPERTY = "connection.retry"; 134 135 /** AWS ClientConfiguration default 50*1000 = 50s */ 136 public static final String CONNECTION_TIMEOUT_PROPERTY = "connection.timeout"; 137 138 /** AWS ClientConfiguration default 50*1000 = 50s */ 139 public static final String SOCKET_TIMEOUT_PROPERTY = "socket.timeout"; 140 141 public static final String KEYSTORE_FILE_PROPERTY = "crypt.keystore.file"; 142 143 public static final String KEYSTORE_PASS_PROPERTY = "crypt.keystore.password"; 144 145 public static final String SERVERSIDE_ENCRYPTION_PROPERTY = "crypt.serverside"; 146 147 public static final String SERVERSIDE_ENCRYPTION_KMS_KEY_PROPERTY = "crypt.kms.key"; 148 149 public static final String PRIVKEY_ALIAS_PROPERTY = "crypt.key.alias"; 150 151 public static final String PRIVKEY_PASS_PROPERTY = "crypt.key.password"; 152 153 public static final String ENDPOINT_PROPERTY = "endpoint"; 154 155 /** 156 * @since 10.3 157 */ 158 public static final String PATHSTYLEACCESS_PROPERTY = "pathstyleaccess"; 159 160 public static final String DIRECTDOWNLOAD_PROPERTY_COMPAT = "downloadfroms3"; 161 162 public static final String DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT = "downloadfroms3.expire"; 163 164 public static final String DELIMITER = "/"; 165 166 private static final Pattern MD5_RE = Pattern.compile("[0-9a-f]{32}"); 167 168 protected String bucketName; 169 170 protected String bucketNamePrefix; 171 172 protected AWSCredentialsProvider awsCredentialsProvider; 173 174 protected ClientConfiguration clientConfiguration; 175 176 protected EncryptionMaterials encryptionMaterials; 177 178 protected boolean isEncrypted; 179 180 protected CryptoConfiguration cryptoConfiguration; 181 182 protected boolean useServerSideEncryption; 183 184 protected String serverSideKMSKeyID; 185 186 protected AmazonS3 amazonS3; 187 188 protected TransferManager transferManager; 189 190 @Override 191 public void close() { 192 // this also shuts down the AmazonS3Client 193 transferManager.shutdownNow(); 194 super.close(); 195 } 196 197 /** 198 * Aborts uploads that crashed and are older than 1 day. 199 * 200 * @since 7.2 201 */ 202 protected void abortOldUploads() throws IOException { 203 int oneDay = 1000 * 60 * 60 * 24; 204 try { 205 transferManager.abortMultipartUploads(bucketName, new Date(System.currentTimeMillis() - oneDay)); 206 } catch (AmazonS3Exception e) { 207 if (e.getStatusCode() == 400 || e.getStatusCode() == 404) { 208 log.error("Your cloud provider does not support aborting old uploads"); 209 return; 210 } 211 throw new IOException("Failed to abort old uploads", e); 212 } 213 } 214 215 @Override 216 protected void setupCloudClient() throws IOException { 217 // Get settings from the configuration 218 bucketName = getProperty(BUCKET_NAME_PROPERTY); 219 // as bucket prefix is optional we don't want to use the fallback mechanism 220 bucketNamePrefix = StringUtils.defaultString(properties.get(BUCKET_PREFIX_PROPERTY)); 221 String bucketRegion = getProperty(BUCKET_REGION_PROPERTY); 222 if (isBlank(bucketRegion)) { 223 bucketRegion = NuxeoAWSRegionProvider.getInstance().getRegion(); 224 } 225 String awsID = getProperty(AWS_ID_PROPERTY); 226 String awsSecret = getProperty(AWS_SECRET_PROPERTY); 227 String awsToken = getProperty(AWS_SESSION_TOKEN_PROPERTY); 228 229 String proxyHost = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_HOST); 230 String proxyPort = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PORT); 231 String proxyLogin = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_LOGIN); 232 String proxyPassword = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PASSWORD); 233 234 int maxConnections = getIntProperty(CONNECTION_MAX_PROPERTY); 235 int maxErrorRetry = getIntProperty(CONNECTION_RETRY_PROPERTY); 236 int connectionTimeout = getIntProperty(CONNECTION_TIMEOUT_PROPERTY); 237 int socketTimeout = getIntProperty(SOCKET_TIMEOUT_PROPERTY); 238 239 String keystoreFile = getProperty(KEYSTORE_FILE_PROPERTY); 240 String keystorePass = getProperty(KEYSTORE_PASS_PROPERTY); 241 String privkeyAlias = getProperty(PRIVKEY_ALIAS_PROPERTY); 242 String privkeyPass = getProperty(PRIVKEY_PASS_PROPERTY); 243 String endpoint = getProperty(ENDPOINT_PROPERTY); 244 boolean pathStyleAccessEnabled = getBooleanProperty(PATHSTYLEACCESS_PROPERTY); 245 String sseprop = getProperty(SERVERSIDE_ENCRYPTION_PROPERTY); 246 if (isNotBlank(sseprop)) { 247 useServerSideEncryption = Boolean.parseBoolean(sseprop); 248 serverSideKMSKeyID = getProperty(SERVERSIDE_ENCRYPTION_KMS_KEY_PROPERTY); 249 } 250 251 if (isBlank(bucketName)) { 252 throw new RuntimeException("Missing conf: " + BUCKET_NAME_PROPERTY); 253 } 254 255 if (!isBlank(bucketNamePrefix) && !bucketNamePrefix.endsWith(DELIMITER)) { 256 log.warn(String.format("%s %s S3 bucket prefix should end with '/' : added automatically.", 257 BUCKET_PREFIX_PROPERTY, bucketNamePrefix)); 258 bucketNamePrefix += DELIMITER; 259 } 260 if (isNotBlank(namespace)) { 261 // use namespace as an additional prefix 262 bucketNamePrefix += namespace; 263 if (!bucketNamePrefix.endsWith(DELIMITER)) { 264 bucketNamePrefix += DELIMITER; 265 } 266 } 267 268 // set up credentials 269 awsCredentialsProvider = S3Utils.getAWSCredentialsProvider(awsID, awsSecret, awsToken); 270 271 // set up client configuration 272 clientConfiguration = new ClientConfiguration(); 273 if (isNotBlank(proxyHost)) { 274 clientConfiguration.setProxyHost(proxyHost); 275 } 276 if (isNotBlank(proxyPort)) { 277 clientConfiguration.setProxyPort(Integer.parseInt(proxyPort)); 278 } 279 if (isNotBlank(proxyLogin)) { 280 clientConfiguration.setProxyUsername(proxyLogin); 281 } 282 if (proxyPassword != null) { // could be blank 283 clientConfiguration.setProxyPassword(proxyPassword); 284 } 285 if (maxConnections > 0) { 286 clientConfiguration.setMaxConnections(maxConnections); 287 } 288 if (maxErrorRetry >= 0) { // 0 is allowed 289 clientConfiguration.setMaxErrorRetry(maxErrorRetry); 290 } 291 if (connectionTimeout >= 0) { // 0 is allowed 292 clientConfiguration.setConnectionTimeout(connectionTimeout); 293 } 294 if (socketTimeout >= 0) { // 0 is allowed 295 clientConfiguration.setSocketTimeout(socketTimeout); 296 } 297 298 // set up encryption 299 encryptionMaterials = null; 300 if (isNotBlank(keystoreFile)) { 301 boolean confok = true; 302 if (keystorePass == null) { // could be blank 303 log.error("Keystore password missing"); 304 confok = false; 305 } 306 if (isBlank(privkeyAlias)) { 307 log.error("Key alias missing"); 308 confok = false; 309 } 310 if (privkeyPass == null) { // could be blank 311 log.error("Key password missing"); 312 confok = false; 313 } 314 if (!confok) { 315 throw new RuntimeException("S3 Crypto configuration incomplete"); 316 } 317 try { 318 // Open keystore 319 File ksFile = new File(keystoreFile); 320 KeyStore keystore; 321 try (FileInputStream ksStream = new FileInputStream(ksFile)) { 322 keystore = KeyStore.getInstance(KeyStore.getDefaultType()); 323 keystore.load(ksStream, keystorePass.toCharArray()); 324 } 325 // Get keypair for alias 326 if (!keystore.isKeyEntry(privkeyAlias)) { 327 throw new RuntimeException("Alias " + privkeyAlias + " is missing or not a key alias"); 328 } 329 PrivateKey privKey = (PrivateKey) keystore.getKey(privkeyAlias, privkeyPass.toCharArray()); 330 Certificate cert = keystore.getCertificate(privkeyAlias); 331 PublicKey pubKey = cert.getPublicKey(); 332 KeyPair keypair = new KeyPair(pubKey, privKey); 333 // Get encryptionMaterials from keypair 334 encryptionMaterials = new EncryptionMaterials(keypair); 335 cryptoConfiguration = new CryptoConfiguration(); 336 } catch (IOException | GeneralSecurityException e) { 337 throw new RuntimeException("Could not read keystore: " + keystoreFile + ", alias: " + privkeyAlias, e); 338 } 339 } 340 isEncrypted = encryptionMaterials != null; 341 342 AmazonS3Builder<?, ?> s3Builder; 343 // Try to create bucket if it doesn't exist 344 if (!isEncrypted) { 345 s3Builder = AmazonS3ClientBuilder.standard() 346 .withCredentials(awsCredentialsProvider) 347 .withClientConfiguration(clientConfiguration); 348 349 } else { 350 s3Builder = AmazonS3EncryptionClientBuilder.standard() 351 .withClientConfiguration(clientConfiguration) 352 .withCryptoConfiguration(cryptoConfiguration) 353 .withCredentials(awsCredentialsProvider) 354 .withEncryptionMaterials(new StaticEncryptionMaterialsProvider( 355 encryptionMaterials)); 356 } 357 if (pathStyleAccessEnabled) { 358 log.debug("Path-style access enabled"); 359 s3Builder.enablePathStyleAccess(); 360 } 361 if (isNotBlank(endpoint)) { 362 s3Builder = s3Builder.withEndpointConfiguration(new EndpointConfiguration(endpoint, bucketRegion)); 363 } else { 364 s3Builder = s3Builder.withRegion(bucketRegion); 365 } 366 367 amazonS3 = s3Builder.build(); 368 369 try { 370 if (!amazonS3.doesBucketExist(bucketName)) { 371 amazonS3.createBucket(bucketName); 372 amazonS3.setBucketAcl(bucketName, CannedAccessControlList.Private); 373 } 374 } catch (AmazonClientException e) { 375 throw new IOException(e); 376 } 377 378 // compat for NXP-17895, using "downloadfroms3", to be removed 379 // these two fields have already been initialized by the base class initialize() 380 // using standard property "directdownload" 381 String dd = getProperty(DIRECTDOWNLOAD_PROPERTY_COMPAT); 382 if (dd != null) { 383 directDownload = Boolean.parseBoolean(dd); 384 } 385 int dde = getIntProperty(DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT); 386 if (dde >= 0) { 387 directDownloadExpire = dde; 388 } 389 390 transferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build(); 391 abortOldUploads(); 392 } 393 394 protected void removeBinary(String digest) { 395 amazonS3.deleteObject(bucketName, bucketNamePrefix + digest); 396 } 397 398 @Override 399 protected String getSystemPropertyPrefix() { 400 return SYSTEM_PROPERTY_PREFIX; 401 } 402 403 @Override 404 protected BinaryGarbageCollector instantiateGarbageCollector() { 405 return new S3BinaryGarbageCollector(this); 406 } 407 408 @Override 409 public void removeBinaries(Collection<String> digests) { 410 digests.forEach(this::removeBinary); 411 } 412 413 protected static boolean isMissingKey(AmazonClientException e) { 414 if (e instanceof AmazonServiceException) { 415 AmazonServiceException ase = (AmazonServiceException) e; 416 return (ase.getStatusCode() == 404) || "NoSuchKey".equals(ase.getErrorCode()) 417 || "Not Found".equals(e.getMessage()); 418 } 419 return false; 420 } 421 422 public static boolean isMD5(String digest) { 423 return MD5_RE.matcher(digest).matches(); 424 } 425 426 /** 427 * Used in the healthCheck; the transferManager should be initialized and the bucket accessible 428 * 429 * @since 9.3 430 */ 431 public boolean canAccessBucket() { 432 return transferManager != null && transferManager.getAmazonS3Client().doesBucketExist(bucketName); 433 } 434 435 @Override 436 protected FileStorage getFileStorage() { 437 return new S3FileStorage(); 438 } 439 440 /** 441 * Gets the AWSCredentialsProvider. 442 * 443 * @since 10.2 444 */ 445 public AWSCredentialsProvider getAwsCredentialsProvider() { 446 return awsCredentialsProvider; 447 } 448 449 /** 450 * Gets AmazonS3. 451 * 452 * @since 10.2 453 */ 454 public AmazonS3 getAmazonS3() { 455 return amazonS3; 456 } 457 458 @Override 459 public String writeBlob(Blob blob) throws IOException { 460 // Attempt to do S3 Copy if the Source Blob provider is also S3 461 if (blob instanceof ManagedBlob) { 462 ManagedBlob managedBlob = (ManagedBlob) blob; 463 BlobProvider blobProvider = Framework.getService(BlobManager.class) 464 .getBlobProvider(managedBlob.getProviderId()); 465 if (blobProvider instanceof S3BinaryManager && blobProvider != this) { 466 // use S3 direct copy as the source blob provider is also S3 467 String key = copyBlob((S3BinaryManager) blobProvider, managedBlob.getKey()); 468 if (key != null) { 469 return key; 470 } 471 } 472 } 473 return super.writeBlob(blob); 474 } 475 476 /** 477 * Copies a blob. Returns {@code null} if the copy was not possible. 478 * 479 * @param sourceBlobProvider the source blob provider 480 * @param blobKey the source blob key 481 * @return the copied blob key, or {@code null} if the copy was not possible 482 * @throws IOException 483 * @since 10.1 484 */ 485 protected String copyBlob(S3BinaryManager sourceBlobProvider, String blobKey) throws IOException { 486 String digest = blobKey; 487 int colon = digest.indexOf(':'); 488 if (colon >= 0) { 489 digest = digest.substring(colon + 1); 490 } 491 String sourceBucketName = sourceBlobProvider.bucketName; 492 String sourceKey = sourceBlobProvider.bucketNamePrefix + digest; 493 String key = bucketNamePrefix + digest; 494 long t0 = 0; 495 if (log.isDebugEnabled()) { 496 t0 = System.currentTimeMillis(); 497 log.debug("copying blob " + sourceKey + " to " + key); 498 } 499 500 try { 501 amazonS3.getObjectMetadata(bucketName, key); 502 if (log.isDebugEnabled()) { 503 log.debug("blob " + key + " is already in S3"); 504 } 505 return digest; 506 } catch (AmazonServiceException e) { 507 if (!isMissingKey(e)) { 508 throw new IOException(e); 509 } 510 // object does not exist, just continue 511 } 512 513 // not already present -> copy the blob 514 ObjectMetadata sourceMetadata; 515 try { 516 sourceMetadata = amazonS3.getObjectMetadata(sourceBucketName, sourceKey); 517 } catch (AmazonServiceException e) { 518 throw new NuxeoException("Source blob does not exists: s3://" + sourceBucketName + "/" + sourceKey, e); 519 } 520 try { 521 if (sourceMetadata.getContentLength() > NON_MULTIPART_COPY_MAX_SIZE) { 522 S3Utils.copyFileMultipart(amazonS3, sourceMetadata, sourceBucketName, sourceKey, bucketName, key, true); 523 } else { 524 S3Utils.copyFile(amazonS3, sourceMetadata, sourceBucketName, sourceKey, bucketName, key, true); 525 } 526 if (log.isDebugEnabled()) { 527 long dtms = System.currentTimeMillis() - t0; 528 log.debug("copied blob " + sourceKey + " to " + key + " in " + dtms + "ms"); 529 } 530 return digest; 531 } catch (AmazonServiceException e) { 532 log.warn("direct S3 copy not supported, please check your keys and policies", e); 533 return null; 534 } 535 } 536 537 public class S3FileStorage implements FileStorage { 538 539 @Override 540 public void storeFile(String digest, File file) throws IOException { 541 long t0 = 0; 542 if (log.isDebugEnabled()) { 543 t0 = System.currentTimeMillis(); 544 log.debug("storing blob " + digest + " to S3"); 545 } 546 String key = bucketNamePrefix + digest; 547 try { 548 amazonS3.getObjectMetadata(bucketName, key); 549 if (log.isDebugEnabled()) { 550 log.debug("blob " + digest + " is already in S3"); 551 } 552 } catch (AmazonClientException e) { 553 if (!isMissingKey(e)) { 554 throw new IOException(e); 555 } 556 // not already present -> store the blob 557 PutObjectRequest request; 558 if (!isEncrypted) { 559 request = new PutObjectRequest(bucketName, key, file); 560 if (useServerSideEncryption) { 561 ObjectMetadata objectMetadata = new ObjectMetadata(); 562 if (isNotBlank(serverSideKMSKeyID)) { 563 SSEAwsKeyManagementParams keyManagementParams = new SSEAwsKeyManagementParams( 564 serverSideKMSKeyID); 565 request = request.withSSEAwsKeyManagementParams(keyManagementParams); 566 } else { 567 objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); 568 } 569 request.setMetadata(objectMetadata); 570 } 571 } else { 572 request = new EncryptedPutObjectRequest(bucketName, key, file); 573 } 574 Upload upload = transferManager.upload(request); 575 try { 576 upload.waitForUploadResult(); 577 } catch (AmazonClientException ee) { 578 throw new IOException(ee); 579 } catch (InterruptedException ee) { 580 Thread.currentThread().interrupt(); 581 throw new RuntimeException(ee); 582 } finally { 583 if (log.isDebugEnabled()) { 584 long dtms = System.currentTimeMillis() - t0; 585 log.debug("stored blob " + digest + " to S3 in " + dtms + "ms"); 586 } 587 } 588 } 589 } 590 591 @Override 592 public boolean fetchFile(String digest, File file) throws IOException { 593 long t0 = 0; 594 if (log.isDebugEnabled()) { 595 t0 = System.currentTimeMillis(); 596 log.debug("fetching blob " + digest + " from S3"); 597 } 598 try { 599 Download download = transferManager.download( 600 new GetObjectRequest(bucketName, bucketNamePrefix + digest), file); 601 download.waitForCompletion(); 602 // Check ETag it is by default MD5 if not multipart 603 if (!isEncrypted && !digest.equals(download.getObjectMetadata().getETag())) { 604 // In case of multipart it will happen, verify the downloaded file 605 String currentDigest; 606 try (FileInputStream input = new FileInputStream(file)) { 607 currentDigest = DigestUtils.md5Hex(input); 608 } 609 if (!currentDigest.equals(digest)) { 610 log.error("Invalid ETag in S3, currentDigest=" + currentDigest + " expectedDigest=" + digest); 611 throw new IOException("Invalid S3 object, it is corrupted expected digest is " + digest 612 + " got " + currentDigest); 613 } 614 } 615 return true; 616 } catch (AmazonClientException e) { 617 if (!isMissingKey(e)) { 618 throw new IOException(e); 619 } 620 return false; 621 } catch (InterruptedException e) { 622 Thread.currentThread().interrupt(); 623 throw new RuntimeException(e); 624 } finally { 625 if (log.isDebugEnabled()) { 626 long dtms = System.currentTimeMillis() - t0; 627 log.debug("fetched blob " + digest + " from S3 in " + dtms + "ms"); 628 } 629 } 630 631 } 632 } 633 634 /** 635 * Garbage collector for S3 binaries that stores the marked (in use) binaries in memory. 636 */ 637 public static class S3BinaryGarbageCollector extends AbstractBinaryGarbageCollector<S3BinaryManager> { 638 639 protected S3BinaryGarbageCollector(S3BinaryManager binaryManager) { 640 super(binaryManager); 641 } 642 643 @Override 644 public String getId() { 645 return "s3:" + binaryManager.bucketName; 646 } 647 648 @Override 649 public Set<String> getUnmarkedBlobs() { 650 // list S3 objects in the bucket 651 // record those not marked 652 Set<String> unmarked = new HashSet<>(); 653 ObjectListing list = null; 654 do { 655 if (list == null) { 656 // use delimiter to avoid useless listing of objects in "subdirectories" 657 ListObjectsRequest listObjectsRequest = new ListObjectsRequest(binaryManager.bucketName, 658 binaryManager.bucketNamePrefix, null, DELIMITER, null); 659 list = binaryManager.amazonS3.listObjects(listObjectsRequest); 660 } else { 661 list = binaryManager.amazonS3.listNextBatchOfObjects(list); 662 } 663 int prefixLength = binaryManager.bucketNamePrefix.length(); 664 for (S3ObjectSummary summary : list.getObjectSummaries()) { 665 String digest = summary.getKey().substring(prefixLength); 666 if (!isMD5(digest)) { 667 // ignore files that cannot be MD5 digests for 668 // safety 669 continue; 670 } 671 long length = summary.getSize(); 672 if (marked.contains(digest)) { 673 status.numBinaries++; 674 status.sizeBinaries += length; 675 } else { 676 status.numBinariesGC++; 677 status.sizeBinariesGC += length; 678 // record file to delete 679 unmarked.add(digest); 680 marked.remove(digest); // optimize memory 681 } 682 } 683 } while (list.isTruncated()); 684 685 return unmarked; 686 } 687 } 688 689 // ******************** BlobProvider ******************** 690 691 @Override 692 protected URI getRemoteUri(String digest, ManagedBlob blob, HttpServletRequest servletRequest) throws IOException { 693 String key = bucketNamePrefix + digest; 694 Date expiration = new Date(); 695 expiration.setTime(expiration.getTime() + directDownloadExpire * 1000); 696 GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucketName, key, HttpMethod.GET); 697 request.addRequestParameter("response-content-type", getContentTypeHeader(blob)); 698 request.addRequestParameter("response-content-disposition", getContentDispositionHeader(blob, null)); 699 request.setExpiration(expiration); 700 URL url = amazonS3.generatePresignedUrl(request); 701 try { 702 return url.toURI(); 703 } catch (URISyntaxException e) { 704 throw new IOException(e); 705 } 706 } 707 708}