001/* 002 * (C) Copyright 2011-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Mathieu Guillaume 018 * Florent Guillaume 019 * Luís Duarte 020 */ 021package org.nuxeo.ecm.core.storage.sql; 022 023import static org.apache.commons.lang3.StringUtils.isBlank; 024import static org.apache.commons.lang3.StringUtils.isNotBlank; 025import static org.nuxeo.ecm.core.storage.sql.S3Utils.NON_MULTIPART_COPY_MAX_SIZE; 026 027import java.io.File; 028import java.io.FileInputStream; 029import java.io.IOException; 030import java.net.URI; 031import java.net.URISyntaxException; 032import java.net.URL; 033import java.security.GeneralSecurityException; 034import java.security.KeyPair; 035import java.security.KeyStore; 036import java.security.PrivateKey; 037import java.security.PublicKey; 038import java.security.cert.Certificate; 039import java.util.Collection; 040import java.util.Date; 041import java.util.HashSet; 042import java.util.Set; 043import java.util.regex.Pattern; 044 045import javax.servlet.http.HttpServletRequest; 046 047import org.apache.commons.codec.digest.DigestUtils; 048import org.apache.commons.lang3.StringUtils; 049import org.apache.commons.logging.Log; 050import org.apache.commons.logging.LogFactory; 051import org.nuxeo.common.Environment; 052import org.nuxeo.ecm.blob.AbstractBinaryGarbageCollector; 053import org.nuxeo.ecm.blob.AbstractCloudBinaryManager; 054import org.nuxeo.ecm.core.api.Blob; 055import org.nuxeo.ecm.core.api.NuxeoException; 056import org.nuxeo.ecm.core.blob.BlobManager; 057import org.nuxeo.ecm.core.blob.BlobProvider; 058import org.nuxeo.ecm.core.blob.ManagedBlob; 059import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 060import org.nuxeo.ecm.core.blob.binary.FileStorage; 061import org.nuxeo.runtime.api.Framework; 062 063import com.amazonaws.AmazonClientException; 064import com.amazonaws.AmazonServiceException; 065import com.amazonaws.ClientConfiguration; 066import com.amazonaws.HttpMethod; 067import com.amazonaws.auth.AWSCredentialsProvider; 068import com.amazonaws.client.builder.AwsClientBuilder; 069import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; 070import com.amazonaws.services.s3.AmazonS3; 071import com.amazonaws.services.s3.AmazonS3ClientBuilder; 072import com.amazonaws.services.s3.AmazonS3EncryptionClientBuilder; 073import com.amazonaws.services.s3.model.AmazonS3Exception; 074import com.amazonaws.services.s3.model.CannedAccessControlList; 075import com.amazonaws.services.s3.model.CryptoConfiguration; 076import com.amazonaws.services.s3.model.EncryptedPutObjectRequest; 077import com.amazonaws.services.s3.model.EncryptionMaterials; 078import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest; 079import com.amazonaws.services.s3.model.GetObjectRequest; 080import com.amazonaws.services.s3.model.ObjectListing; 081import com.amazonaws.services.s3.model.ObjectMetadata; 082import com.amazonaws.services.s3.model.PutObjectRequest; 083import com.amazonaws.services.s3.model.S3ObjectSummary; 084import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams; 085import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider; 086import com.amazonaws.services.s3.transfer.Download; 087import com.amazonaws.services.s3.transfer.TransferManager; 088import com.amazonaws.services.s3.transfer.TransferManagerBuilder; 089import com.amazonaws.services.s3.transfer.Upload; 090import com.google.common.base.MoreObjects; 091 092/** 093 * A Binary Manager that stores binaries as S3 BLOBs 094 * <p> 095 * The BLOBs are cached locally on first access for efficiency. 096 * <p> 097 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file 098 * if accessed before the stream. 099 */ 100public class S3BinaryManager extends AbstractCloudBinaryManager { 101 102 private static final String MD5 = "MD5"; // must be MD5 for Etag 103 104 @Override 105 protected String getDefaultDigestAlgorithm() { 106 return MD5; 107 } 108 109 private static final Log log = LogFactory.getLog(S3BinaryManager.class); 110 111 public static final String SYSTEM_PROPERTY_PREFIX = "nuxeo.s3storage"; 112 113 public static final String BUCKET_NAME_PROPERTY = "bucket"; 114 115 public static final String BUCKET_PREFIX_PROPERTY = "bucket_prefix"; 116 117 public static final String BUCKET_REGION_PROPERTY = "region"; 118 119 public static final String DEFAULT_BUCKET_REGION = "us-east-1"; // US East 120 121 public static final String AWS_ID_PROPERTY = "awsid"; 122 123 public static final String AWS_ID_ENV = "AWS_ACCESS_KEY_ID"; 124 125 public static final String AWS_SECRET_PROPERTY = "awssecret"; 126 127 public static final String AWS_SECRET_ENV = "AWS_SECRET_ACCESS_KEY"; 128 129 /** AWS ClientConfiguration default 50 */ 130 public static final String CONNECTION_MAX_PROPERTY = "connection.max"; 131 132 /** AWS ClientConfiguration default 3 (with exponential backoff) */ 133 public static final String CONNECTION_RETRY_PROPERTY = "connection.retry"; 134 135 /** AWS ClientConfiguration default 50*1000 = 50s */ 136 public static final String CONNECTION_TIMEOUT_PROPERTY = "connection.timeout"; 137 138 /** AWS ClientConfiguration default 50*1000 = 50s */ 139 public static final String SOCKET_TIMEOUT_PROPERTY = "socket.timeout"; 140 141 public static final String KEYSTORE_FILE_PROPERTY = "crypt.keystore.file"; 142 143 public static final String KEYSTORE_PASS_PROPERTY = "crypt.keystore.password"; 144 145 public static final String SERVERSIDE_ENCRYPTION_PROPERTY = "crypt.serverside"; 146 147 public static final String SERVERSIDE_ENCRYPTION_KMS_KEY_PROPERTY = "crypt.kms.key"; 148 149 public static final String PRIVKEY_ALIAS_PROPERTY = "crypt.key.alias"; 150 151 public static final String PRIVKEY_PASS_PROPERTY = "crypt.key.password"; 152 153 public static final String ENDPOINT_PROPERTY = "endpoint"; 154 155 public static final String DIRECTDOWNLOAD_PROPERTY_COMPAT = "downloadfroms3"; 156 157 public static final String DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT = "downloadfroms3.expire"; 158 159 private static final Pattern MD5_RE = Pattern.compile("[0-9a-f]{32}"); 160 161 protected String bucketName; 162 163 protected String bucketNamePrefix; 164 165 protected AWSCredentialsProvider awsCredentialsProvider; 166 167 protected ClientConfiguration clientConfiguration; 168 169 protected EncryptionMaterials encryptionMaterials; 170 171 protected boolean isEncrypted; 172 173 protected CryptoConfiguration cryptoConfiguration; 174 175 protected boolean useServerSideEncryption; 176 177 protected String serverSideKMSKeyID; 178 179 protected AmazonS3 amazonS3; 180 181 protected TransferManager transferManager; 182 183 @Override 184 public void close() { 185 // this also shuts down the AmazonS3Client 186 transferManager.shutdownNow(); 187 super.close(); 188 } 189 190 /** 191 * Aborts uploads that crashed and are older than 1 day. 192 * 193 * @since 7.2 194 */ 195 protected void abortOldUploads() throws IOException { 196 int oneDay = 1000 * 60 * 60 * 24; 197 try { 198 transferManager.abortMultipartUploads(bucketName, new Date(System.currentTimeMillis() - oneDay)); 199 } catch (AmazonS3Exception e) { 200 if (e.getStatusCode() == 400 || e.getStatusCode() == 404) { 201 log.error("Your cloud provider does not support aborting old uploads"); 202 return; 203 } 204 throw new IOException("Failed to abort old uploads", e); 205 } 206 } 207 208 @Override 209 protected void setupCloudClient() throws IOException { 210 // Get settings from the configuration 211 bucketName = getProperty(BUCKET_NAME_PROPERTY); 212 bucketNamePrefix = MoreObjects.firstNonNull(getProperty(BUCKET_PREFIX_PROPERTY), StringUtils.EMPTY); 213 String bucketRegion = getProperty(BUCKET_REGION_PROPERTY); 214 if (isBlank(bucketRegion)) { 215 bucketRegion = DEFAULT_BUCKET_REGION; 216 } 217 String awsID = getProperty(AWS_ID_PROPERTY); 218 String awsSecret = getProperty(AWS_SECRET_PROPERTY); 219 220 String proxyHost = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_HOST); 221 String proxyPort = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PORT); 222 String proxyLogin = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_LOGIN); 223 String proxyPassword = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PASSWORD); 224 225 int maxConnections = getIntProperty(CONNECTION_MAX_PROPERTY); 226 int maxErrorRetry = getIntProperty(CONNECTION_RETRY_PROPERTY); 227 int connectionTimeout = getIntProperty(CONNECTION_TIMEOUT_PROPERTY); 228 int socketTimeout = getIntProperty(SOCKET_TIMEOUT_PROPERTY); 229 230 String keystoreFile = getProperty(KEYSTORE_FILE_PROPERTY); 231 String keystorePass = getProperty(KEYSTORE_PASS_PROPERTY); 232 String privkeyAlias = getProperty(PRIVKEY_ALIAS_PROPERTY); 233 String privkeyPass = getProperty(PRIVKEY_PASS_PROPERTY); 234 String endpoint = getProperty(ENDPOINT_PROPERTY); 235 String sseprop = getProperty(SERVERSIDE_ENCRYPTION_PROPERTY); 236 if (isNotBlank(sseprop)) { 237 useServerSideEncryption = Boolean.parseBoolean(sseprop); 238 serverSideKMSKeyID = getProperty(SERVERSIDE_ENCRYPTION_KMS_KEY_PROPERTY); 239 } 240 241 // Fallback on default env keys for ID and secret 242 if (isBlank(awsID)) { 243 awsID = System.getenv(AWS_ID_ENV); 244 } 245 if (isBlank(awsSecret)) { 246 awsSecret = System.getenv(AWS_SECRET_ENV); 247 } 248 249 if (isBlank(bucketName)) { 250 throw new RuntimeException("Missing conf: " + BUCKET_NAME_PROPERTY); 251 } 252 253 if (!isBlank(bucketNamePrefix) && !bucketNamePrefix.endsWith("/")) { 254 log.warn(String.format("%s %s S3 bucket prefix should end with '/' : added automatically.", 255 BUCKET_PREFIX_PROPERTY, bucketNamePrefix)); 256 bucketNamePrefix += "/"; 257 } 258 // set up credentials 259 awsCredentialsProvider = S3Utils.getAWSCredentialsProvider(awsID, awsSecret); 260 261 // set up client configuration 262 clientConfiguration = new ClientConfiguration(); 263 if (isNotBlank(proxyHost)) { 264 clientConfiguration.setProxyHost(proxyHost); 265 } 266 if (isNotBlank(proxyPort)) { 267 clientConfiguration.setProxyPort(Integer.parseInt(proxyPort)); 268 } 269 if (isNotBlank(proxyLogin)) { 270 clientConfiguration.setProxyUsername(proxyLogin); 271 } 272 if (proxyPassword != null) { // could be blank 273 clientConfiguration.setProxyPassword(proxyPassword); 274 } 275 if (maxConnections > 0) { 276 clientConfiguration.setMaxConnections(maxConnections); 277 } 278 if (maxErrorRetry >= 0) { // 0 is allowed 279 clientConfiguration.setMaxErrorRetry(maxErrorRetry); 280 } 281 if (connectionTimeout >= 0) { // 0 is allowed 282 clientConfiguration.setConnectionTimeout(connectionTimeout); 283 } 284 if (socketTimeout >= 0) { // 0 is allowed 285 clientConfiguration.setSocketTimeout(socketTimeout); 286 } 287 288 // set up encryption 289 encryptionMaterials = null; 290 if (isNotBlank(keystoreFile)) { 291 boolean confok = true; 292 if (keystorePass == null) { // could be blank 293 log.error("Keystore password missing"); 294 confok = false; 295 } 296 if (isBlank(privkeyAlias)) { 297 log.error("Key alias missing"); 298 confok = false; 299 } 300 if (privkeyPass == null) { // could be blank 301 log.error("Key password missing"); 302 confok = false; 303 } 304 if (!confok) { 305 throw new RuntimeException("S3 Crypto configuration incomplete"); 306 } 307 try { 308 // Open keystore 309 File ksFile = new File(keystoreFile); 310 KeyStore keystore; 311 try (FileInputStream ksStream = new FileInputStream(ksFile)) { 312 keystore = KeyStore.getInstance(KeyStore.getDefaultType()); 313 keystore.load(ksStream, keystorePass.toCharArray()); 314 } 315 // Get keypair for alias 316 if (!keystore.isKeyEntry(privkeyAlias)) { 317 throw new RuntimeException("Alias " + privkeyAlias + " is missing or not a key alias"); 318 } 319 PrivateKey privKey = (PrivateKey) keystore.getKey(privkeyAlias, privkeyPass.toCharArray()); 320 Certificate cert = keystore.getCertificate(privkeyAlias); 321 PublicKey pubKey = cert.getPublicKey(); 322 KeyPair keypair = new KeyPair(pubKey, privKey); 323 // Get encryptionMaterials from keypair 324 encryptionMaterials = new EncryptionMaterials(keypair); 325 cryptoConfiguration = new CryptoConfiguration(); 326 } catch (IOException | GeneralSecurityException e) { 327 throw new RuntimeException("Could not read keystore: " + keystoreFile + ", alias: " + privkeyAlias, e); 328 } 329 } 330 isEncrypted = encryptionMaterials != null; 331 332 @SuppressWarnings("rawtypes") 333 AwsClientBuilder s3Builder; 334 // Try to create bucket if it doesn't exist 335 if (!isEncrypted) { 336 s3Builder = AmazonS3ClientBuilder.standard() 337 .withCredentials(awsCredentialsProvider) 338 .withClientConfiguration(clientConfiguration); 339 340 } else { 341 s3Builder = AmazonS3EncryptionClientBuilder.standard() 342 .withClientConfiguration(clientConfiguration) 343 .withCryptoConfiguration(cryptoConfiguration) 344 .withCredentials(awsCredentialsProvider) 345 .withEncryptionMaterials(new StaticEncryptionMaterialsProvider( 346 encryptionMaterials)); 347 } 348 if (isNotBlank(endpoint)) { 349 s3Builder = s3Builder.withEndpointConfiguration(new EndpointConfiguration(endpoint, bucketRegion)); 350 } else { 351 s3Builder = s3Builder.withRegion(bucketRegion); 352 } 353 354 amazonS3 = (AmazonS3) s3Builder.build(); 355 356 try { 357 if (!amazonS3.doesBucketExist(bucketName)) { 358 amazonS3.createBucket(bucketName); 359 amazonS3.setBucketAcl(bucketName, CannedAccessControlList.Private); 360 } 361 } catch (AmazonClientException e) { 362 throw new IOException(e); 363 } 364 365 // compat for NXP-17895, using "downloadfroms3", to be removed 366 // these two fields have already been initialized by the base class initialize() 367 // using standard property "directdownload" 368 String dd = getProperty(DIRECTDOWNLOAD_PROPERTY_COMPAT); 369 if (dd != null) { 370 directDownload = Boolean.parseBoolean(dd); 371 } 372 int dde = getIntProperty(DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT); 373 if (dde >= 0) { 374 directDownloadExpire = dde; 375 } 376 377 transferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build(); 378 abortOldUploads(); 379 } 380 381 protected void removeBinary(String digest) { 382 amazonS3.deleteObject(bucketName, bucketNamePrefix + digest); 383 } 384 385 @Override 386 protected String getSystemPropertyPrefix() { 387 return SYSTEM_PROPERTY_PREFIX; 388 } 389 390 @Override 391 protected BinaryGarbageCollector instantiateGarbageCollector() { 392 return new S3BinaryGarbageCollector(this); 393 } 394 395 @Override 396 public void removeBinaries(Collection<String> digests) { 397 digests.forEach(this::removeBinary); 398 } 399 400 protected static boolean isMissingKey(AmazonClientException e) { 401 if (e instanceof AmazonServiceException) { 402 AmazonServiceException ase = (AmazonServiceException) e; 403 return (ase.getStatusCode() == 404) || "NoSuchKey".equals(ase.getErrorCode()) 404 || "Not Found".equals(e.getMessage()); 405 } 406 return false; 407 } 408 409 public static boolean isMD5(String digest) { 410 return MD5_RE.matcher(digest).matches(); 411 } 412 413 /** 414 * Used in the healthCheck; the transferManager should be initialized and the bucket accessible 415 * 416 * @since 9.3 417 */ 418 public boolean canAccessBucket() { 419 return transferManager != null && transferManager.getAmazonS3Client().doesBucketExist(bucketName); 420 } 421 422 @Override 423 protected FileStorage getFileStorage() { 424 return new S3FileStorage(); 425 } 426 427 /** 428 * Gets the AWSCredentialsProvider. 429 * @since 10.2 430 */ 431 public AWSCredentialsProvider getAwsCredentialsProvider() { 432 return awsCredentialsProvider; 433 } 434 435 /** 436 * Gets AmazonS3. 437 * @since 10.2 438 */ 439 public AmazonS3 getAmazonS3() { 440 return amazonS3; 441 } 442 443 @Override 444 public String writeBlob(Blob blob) throws IOException { 445 // Attempt to do S3 Copy if the Source Blob provider is also S3 446 if (blob instanceof ManagedBlob) { 447 ManagedBlob managedBlob = (ManagedBlob) blob; 448 BlobProvider blobProvider = Framework.getService(BlobManager.class) 449 .getBlobProvider(managedBlob.getProviderId()); 450 if (blobProvider instanceof S3BinaryManager && blobProvider != this) { 451 // use S3 direct copy as the source blob provider is also S3 452 String key = copyBlob((S3BinaryManager) blobProvider, managedBlob.getKey()); 453 if (key != null) { 454 return key; 455 } 456 } 457 } 458 return super.writeBlob(blob); 459 } 460 461 /** 462 * Copies a blob. Returns {@code null} if the copy was not possible. 463 * 464 * @param sourceBlobProvider the source blob provider 465 * @param blobKey the source blob key 466 * @return the copied blob key, or {@code null} if the copy was not possible 467 * @throws IOException 468 * @since 10.1 469 */ 470 protected String copyBlob(S3BinaryManager sourceBlobProvider, String blobKey) throws IOException { 471 String digest = blobKey; 472 int colon = digest.indexOf(':'); 473 if (colon >= 0) { 474 digest = digest.substring(colon + 1); 475 } 476 String sourceBucketName = sourceBlobProvider.bucketName; 477 String sourceKey = sourceBlobProvider.bucketNamePrefix + digest; 478 String key = bucketNamePrefix + digest; 479 long t0 = 0; 480 if (log.isDebugEnabled()) { 481 t0 = System.currentTimeMillis(); 482 log.debug("copying blob " + sourceKey + " to " + key); 483 } 484 485 try { 486 amazonS3.getObjectMetadata(bucketName, key); 487 if (log.isDebugEnabled()) { 488 log.debug("blob " + key + " is already in S3"); 489 } 490 return digest; 491 } catch (AmazonServiceException e) { 492 if (!isMissingKey(e)) { 493 throw new IOException(e); 494 } 495 // object does not exist, just continue 496 } 497 498 // not already present -> copy the blob 499 ObjectMetadata sourceMetadata; 500 try { 501 sourceMetadata = amazonS3.getObjectMetadata(sourceBucketName, sourceKey); 502 } catch (AmazonServiceException e) { 503 throw new NuxeoException("Source blob does not exists: s3://" + sourceBucketName + "/" + sourceKey, e); 504 } 505 try { 506 if (sourceMetadata.getContentLength() > NON_MULTIPART_COPY_MAX_SIZE) { 507 S3Utils.copyFileMultipart(amazonS3, sourceMetadata, sourceBucketName, sourceKey, bucketName, key, true); 508 } else { 509 S3Utils.copyFile(amazonS3, sourceMetadata, sourceBucketName, sourceKey, bucketName, key, true); 510 } 511 if (log.isDebugEnabled()) { 512 long dtms = System.currentTimeMillis() - t0; 513 log.debug("copied blob " + sourceKey + " to " + key + " in " + dtms + "ms"); 514 } 515 return digest; 516 } catch (AmazonServiceException e) { 517 log.warn("direct S3 copy not supported, please check your keys and policies", e); 518 return null; 519 } 520 } 521 522 public class S3FileStorage implements FileStorage { 523 524 @Override 525 public void storeFile(String digest, File file) throws IOException { 526 long t0 = 0; 527 if (log.isDebugEnabled()) { 528 t0 = System.currentTimeMillis(); 529 log.debug("storing blob " + digest + " to S3"); 530 } 531 String key = bucketNamePrefix + digest; 532 try { 533 amazonS3.getObjectMetadata(bucketName, key); 534 if (log.isDebugEnabled()) { 535 log.debug("blob " + digest + " is already in S3"); 536 } 537 } catch (AmazonClientException e) { 538 if (!isMissingKey(e)) { 539 throw new IOException(e); 540 } 541 // not already present -> store the blob 542 PutObjectRequest request; 543 if (!isEncrypted) { 544 request = new PutObjectRequest(bucketName, key, file); 545 if (useServerSideEncryption) { 546 ObjectMetadata objectMetadata = new ObjectMetadata(); 547 if (isNotBlank(serverSideKMSKeyID)) { 548 SSEAwsKeyManagementParams keyManagementParams = 549 new SSEAwsKeyManagementParams(serverSideKMSKeyID); 550 request = request.withSSEAwsKeyManagementParams(keyManagementParams); 551 } else { 552 objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); 553 } 554 request.setMetadata(objectMetadata); 555 } 556 } else { 557 request = new EncryptedPutObjectRequest(bucketName, key, file); 558 } 559 Upload upload = transferManager.upload(request); 560 try { 561 upload.waitForUploadResult(); 562 } catch (AmazonClientException ee) { 563 throw new IOException(ee); 564 } catch (InterruptedException ee) { 565 Thread.currentThread().interrupt(); 566 throw new RuntimeException(ee); 567 } finally { 568 if (log.isDebugEnabled()) { 569 long dtms = System.currentTimeMillis() - t0; 570 log.debug("stored blob " + digest + " to S3 in " + dtms + "ms"); 571 } 572 } 573 } 574 } 575 576 @Override 577 public boolean fetchFile(String digest, File file) throws IOException { 578 long t0 = 0; 579 if (log.isDebugEnabled()) { 580 t0 = System.currentTimeMillis(); 581 log.debug("fetching blob " + digest + " from S3"); 582 } 583 try { 584 Download download = transferManager.download( 585 new GetObjectRequest(bucketName, bucketNamePrefix + digest), file); 586 download.waitForCompletion(); 587 // Check ETag it is by default MD5 if not multipart 588 if (!isEncrypted && !digest.equals(download.getObjectMetadata().getETag())) { 589 // In case of multipart it will happen, verify the downloaded file 590 String currentDigest; 591 try (FileInputStream input = new FileInputStream(file)) { 592 currentDigest = DigestUtils.md5Hex(input); 593 } 594 if (!currentDigest.equals(digest)) { 595 log.error("Invalid ETag in S3, currentDigest=" + currentDigest + " expectedDigest=" + digest); 596 throw new IOException("Invalid S3 object, it is corrupted expected digest is " + digest 597 + " got " + currentDigest); 598 } 599 } 600 return true; 601 } catch (AmazonClientException e) { 602 if (!isMissingKey(e)) { 603 throw new IOException(e); 604 } 605 return false; 606 } catch (InterruptedException e) { 607 Thread.currentThread().interrupt(); 608 throw new RuntimeException(e); 609 } finally { 610 if (log.isDebugEnabled()) { 611 long dtms = System.currentTimeMillis() - t0; 612 log.debug("fetched blob " + digest + " from S3 in " + dtms + "ms"); 613 } 614 } 615 616 } 617 } 618 619 /** 620 * Garbage collector for S3 binaries that stores the marked (in use) binaries in memory. 621 */ 622 public static class S3BinaryGarbageCollector extends AbstractBinaryGarbageCollector<S3BinaryManager> { 623 624 protected S3BinaryGarbageCollector(S3BinaryManager binaryManager) { 625 super(binaryManager); 626 } 627 628 @Override 629 public String getId() { 630 return "s3:" + binaryManager.bucketName; 631 } 632 633 @Override 634 public Set<String> getUnmarkedBlobs() { 635 // list S3 objects in the bucket 636 // record those not marked 637 Set<String> unmarked = new HashSet<>(); 638 ObjectListing list = null; 639 do { 640 if (list == null) { 641 list = binaryManager.amazonS3.listObjects(binaryManager.bucketName, binaryManager.bucketNamePrefix); 642 } else { 643 list = binaryManager.amazonS3.listNextBatchOfObjects(list); 644 } 645 int prefixLength = binaryManager.bucketNamePrefix.length(); 646 for (S3ObjectSummary summary : list.getObjectSummaries()) { 647 String digest = summary.getKey().substring(prefixLength); 648 if (!isMD5(digest)) { 649 // ignore files that cannot be MD5 digests for 650 // safety 651 continue; 652 } 653 long length = summary.getSize(); 654 if (marked.contains(digest)) { 655 status.numBinaries++; 656 status.sizeBinaries += length; 657 } else { 658 status.numBinariesGC++; 659 status.sizeBinariesGC += length; 660 // record file to delete 661 unmarked.add(digest); 662 marked.remove(digest); // optimize memory 663 } 664 } 665 } while (list.isTruncated()); 666 667 return unmarked; 668 } 669 } 670 671 // ******************** BlobProvider ******************** 672 673 @Override 674 protected URI getRemoteUri(String digest, ManagedBlob blob, HttpServletRequest servletRequest) throws IOException { 675 String key = bucketNamePrefix + digest; 676 Date expiration = new Date(); 677 expiration.setTime(expiration.getTime() + directDownloadExpire * 1000); 678 GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucketName, key, HttpMethod.GET); 679 request.addRequestParameter("response-content-type", getContentTypeHeader(blob)); 680 request.addRequestParameter("response-content-disposition", getContentDispositionHeader(blob, null)); 681 request.setExpiration(expiration); 682 URL url = amazonS3.generatePresignedUrl(request); 683 try { 684 return url.toURI(); 685 } catch (URISyntaxException e) { 686 throw new IOException(e); 687 } 688 } 689 690}