001/* 002 * (C) Copyright 2011-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Mathieu Guillaume 018 * Florent Guillaume 019 * Luís Duarte 020 */ 021package org.nuxeo.ecm.core.storage.sql; 022 023import static org.apache.commons.lang3.StringUtils.isBlank; 024import static org.apache.commons.lang3.StringUtils.isNotBlank; 025import static org.nuxeo.ecm.blob.s3.S3BlobStoreConfiguration.DISABLE_PROXY_PROPERTY; 026import static org.nuxeo.ecm.blob.s3.S3BlobStoreConfiguration.MULTIPART_CLEANUP_DISABLED_PROPERTY; 027 028import java.io.File; 029import java.io.FileInputStream; 030import java.io.IOException; 031import java.net.URI; 032import java.net.URISyntaxException; 033import java.net.URL; 034import java.security.GeneralSecurityException; 035import java.security.KeyPair; 036import java.security.KeyStore; 037import java.security.PrivateKey; 038import java.security.PublicKey; 039import java.security.cert.Certificate; 040import java.util.Collection; 041import java.util.Date; 042import java.util.HashSet; 043import java.util.Set; 044import java.util.regex.Pattern; 045 046import javax.servlet.http.HttpServletRequest; 047 048import org.apache.commons.codec.digest.DigestUtils; 049import org.apache.commons.lang3.StringUtils; 050import org.apache.commons.logging.Log; 051import org.apache.commons.logging.LogFactory; 052import org.nuxeo.common.Environment; 053import org.nuxeo.ecm.blob.AbstractBinaryGarbageCollector; 054import org.nuxeo.ecm.blob.AbstractCloudBinaryManager; 055import org.nuxeo.ecm.blob.s3.S3ManagedTransfer; 056import org.nuxeo.ecm.core.api.Blob; 057import org.nuxeo.ecm.core.api.NuxeoException; 058import org.nuxeo.ecm.core.blob.BlobManager; 059import org.nuxeo.ecm.core.blob.BlobProvider; 060import org.nuxeo.ecm.core.blob.ManagedBlob; 061import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 062import org.nuxeo.ecm.core.blob.binary.FileStorage; 063import org.nuxeo.runtime.api.Framework; 064import org.nuxeo.runtime.aws.NuxeoAWSRegionProvider; 065 066import com.amazonaws.AmazonClientException; 067import com.amazonaws.AmazonServiceException; 068import com.amazonaws.ClientConfiguration; 069import com.amazonaws.HttpMethod; 070import com.amazonaws.auth.AWSCredentialsProvider; 071import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; 072import com.amazonaws.services.s3.AmazonS3; 073import com.amazonaws.services.s3.AmazonS3Builder; 074import com.amazonaws.services.s3.AmazonS3ClientBuilder; 075import com.amazonaws.services.s3.AmazonS3EncryptionClientBuilder; 076import com.amazonaws.services.s3.model.AmazonS3Exception; 077import com.amazonaws.services.s3.model.CannedAccessControlList; 078import com.amazonaws.services.s3.model.CryptoConfiguration; 079import com.amazonaws.services.s3.model.CopyObjectRequest; 080import com.amazonaws.services.s3.model.EncryptedPutObjectRequest; 081import com.amazonaws.services.s3.model.EncryptionMaterials; 082import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest; 083import com.amazonaws.services.s3.model.GetObjectRequest; 084import com.amazonaws.services.s3.model.ListObjectsRequest; 085import com.amazonaws.services.s3.model.ObjectListing; 086import com.amazonaws.services.s3.model.ObjectMetadata; 087import com.amazonaws.services.s3.model.PutObjectRequest; 088import com.amazonaws.services.s3.model.S3ObjectSummary; 089import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams; 090import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider; 091import com.amazonaws.services.s3.transfer.Copy; 092import com.amazonaws.services.s3.transfer.Download; 093import com.amazonaws.services.s3.transfer.TransferManager; 094import com.amazonaws.services.s3.transfer.TransferManagerBuilder; 095import com.amazonaws.services.s3.transfer.Upload; 096 097/** 098 * A Binary Manager that stores binaries as S3 BLOBs 099 * <p> 100 * The BLOBs are cached locally on first access for efficiency. 101 * <p> 102 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file 103 * if accessed before the stream. 104 */ 105public class S3BinaryManager extends AbstractCloudBinaryManager implements S3ManagedTransfer { 106 107 private static final Log log = LogFactory.getLog(S3BinaryManager.class); 108 109 public static final String SYSTEM_PROPERTY_PREFIX = "nuxeo.s3storage"; 110 111 public static final String BUCKET_NAME_PROPERTY = "bucket"; 112 113 public static final String BUCKET_PREFIX_PROPERTY = "bucket_prefix"; 114 115 public static final String BUCKET_REGION_PROPERTY = "region"; 116 117 public static final String AWS_ID_PROPERTY = "awsid"; 118 119 public static final String AWS_SECRET_PROPERTY = "awssecret"; 120 121 /** 122 * @since 10.10 123 */ 124 public static final String AWS_SESSION_TOKEN_PROPERTY = "awstoken"; 125 126 /** AWS ClientConfiguration default 50 */ 127 public static final String CONNECTION_MAX_PROPERTY = "connection.max"; 128 129 /** AWS ClientConfiguration default 3 (with exponential backoff) */ 130 public static final String CONNECTION_RETRY_PROPERTY = "connection.retry"; 131 132 /** AWS ClientConfiguration default 50*1000 = 50s */ 133 public static final String CONNECTION_TIMEOUT_PROPERTY = "connection.timeout"; 134 135 /** AWS ClientConfiguration default 50*1000 = 50s */ 136 public static final String SOCKET_TIMEOUT_PROPERTY = "socket.timeout"; 137 138 public static final String KEYSTORE_FILE_PROPERTY = "crypt.keystore.file"; 139 140 public static final String KEYSTORE_PASS_PROPERTY = "crypt.keystore.password"; 141 142 public static final String SERVERSIDE_ENCRYPTION_PROPERTY = "crypt.serverside"; 143 144 public static final String SERVERSIDE_ENCRYPTION_KMS_KEY_PROPERTY = "crypt.kms.key"; 145 146 public static final String PRIVKEY_ALIAS_PROPERTY = "crypt.key.alias"; 147 148 public static final String PRIVKEY_PASS_PROPERTY = "crypt.key.password"; 149 150 public static final String ENDPOINT_PROPERTY = "endpoint"; 151 152 /** 153 * @since 10.3 154 */ 155 public static final String PATHSTYLEACCESS_PROPERTY = "pathstyleaccess"; 156 157 /** @since 11.1 */ 158 public static final String ACCELERATE_MODE_PROPERTY = "accelerateMode"; 159 160 public static final String DIRECTDOWNLOAD_PROPERTY_COMPAT = "downloadfroms3"; 161 162 public static final String DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT = "downloadfroms3.expire"; 163 164 public static final String DELIMITER = "/"; 165 166 /** @deprecated since 11.1, now unused */ 167 @Deprecated 168 private static final Pattern MD5_RE = Pattern.compile("[0-9a-f]{32}"); 169 170 protected String bucketName; 171 172 protected String bucketNamePrefix; 173 174 protected AWSCredentialsProvider awsCredentialsProvider; 175 176 protected ClientConfiguration clientConfiguration; 177 178 protected EncryptionMaterials encryptionMaterials; 179 180 protected boolean isEncrypted; 181 182 protected CryptoConfiguration cryptoConfiguration; 183 184 protected boolean useServerSideEncryption; 185 186 protected String serverSideKMSKeyID; 187 188 protected AmazonS3 amazonS3; 189 190 protected TransferManager transferManager; 191 192 @Override 193 public void close() { 194 // this also shuts down the AmazonS3Client 195 transferManager.shutdownNow(); 196 super.close(); 197 } 198 199 /** 200 * Aborts uploads that crashed and are older than 1 day. 201 * 202 * @since 7.2 203 */ 204 protected void abortOldUploads() { 205 if (getBooleanProperty(MULTIPART_CLEANUP_DISABLED_PROPERTY)) { 206 log.debug("Cleanup of old multipart uploads is disabled"); 207 return; 208 } 209 // Async to avoid issues with transferManager.abortMultipartUploads taking a very long time. 210 // See NXP-28571. 211 new Thread(this::abortOldMultipartUploadsInternal, "Nuxeo-S3-abortOldMultipartUploads-" + bucketName).start(); 212 } 213 214 // executed in a separate thread 215 protected void abortOldMultipartUploadsInternal() { 216 int oneDay = 1000 * 60 * 60 * 24; 217 try { 218 log.debug("Starting cleanup of old multipart uploads for bucket: " + bucketName); 219 transferManager.abortMultipartUploads(bucketName, new Date(System.currentTimeMillis() - oneDay)); 220 log.debug("Cleanup done for bucket: " + bucketName); 221 } catch (AmazonS3Exception e) { 222 if (e.getStatusCode() == 400 || e.getStatusCode() == 404) { 223 log.error("Your cloud provider does not support aborting old uploads"); 224 return; 225 } 226 throw new NuxeoException("Failed to abort old uploads", e); 227 } 228 } 229 230 @Override 231 protected void setupCloudClient() throws IOException { 232 // Get settings from the configuration 233 bucketName = getProperty(BUCKET_NAME_PROPERTY); 234 // as bucket prefix is optional we don't want to use the fallback mechanism 235 bucketNamePrefix = StringUtils.defaultString(properties.get(BUCKET_PREFIX_PROPERTY)); 236 String bucketRegion = getProperty(BUCKET_REGION_PROPERTY); 237 if (isBlank(bucketRegion)) { 238 bucketRegion = NuxeoAWSRegionProvider.getInstance().getRegion(); 239 } 240 String awsID = getProperty(AWS_ID_PROPERTY); 241 String awsSecret = getProperty(AWS_SECRET_PROPERTY); 242 String awsToken = getProperty(AWS_SESSION_TOKEN_PROPERTY); 243 244 boolean proxyDisabled = Framework.isBooleanPropertyTrue(DISABLE_PROXY_PROPERTY); 245 String proxyHost = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_HOST); 246 String proxyPort = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PORT); 247 String proxyLogin = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_LOGIN); 248 String proxyPassword = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PASSWORD); 249 250 int maxConnections = getIntProperty(CONNECTION_MAX_PROPERTY); 251 int maxErrorRetry = getIntProperty(CONNECTION_RETRY_PROPERTY); 252 int connectionTimeout = getIntProperty(CONNECTION_TIMEOUT_PROPERTY); 253 int socketTimeout = getIntProperty(SOCKET_TIMEOUT_PROPERTY); 254 255 String keystoreFile = getProperty(KEYSTORE_FILE_PROPERTY); 256 String keystorePass = getProperty(KEYSTORE_PASS_PROPERTY); 257 String privkeyAlias = getProperty(PRIVKEY_ALIAS_PROPERTY); 258 String privkeyPass = getProperty(PRIVKEY_PASS_PROPERTY); 259 String endpoint = getProperty(ENDPOINT_PROPERTY); 260 boolean accelerateModeEnabled = getBooleanProperty(ACCELERATE_MODE_PROPERTY); 261 boolean pathStyleAccessEnabled = getBooleanProperty(PATHSTYLEACCESS_PROPERTY); 262 String sseprop = getProperty(SERVERSIDE_ENCRYPTION_PROPERTY); 263 if (isNotBlank(sseprop)) { 264 useServerSideEncryption = Boolean.parseBoolean(sseprop); 265 serverSideKMSKeyID = getProperty(SERVERSIDE_ENCRYPTION_KMS_KEY_PROPERTY); 266 } 267 268 if (isBlank(bucketName)) { 269 throw new RuntimeException("Missing conf: " + BUCKET_NAME_PROPERTY); 270 } 271 272 if (!isBlank(bucketNamePrefix) && !bucketNamePrefix.endsWith(DELIMITER)) { 273 log.debug(String.format("%s %s S3 bucket prefix should end with '/' : added automatically.", 274 BUCKET_PREFIX_PROPERTY, bucketNamePrefix)); 275 bucketNamePrefix += DELIMITER; 276 } 277 if (isNotBlank(namespace)) { 278 // use namespace as an additional prefix 279 bucketNamePrefix += namespace; 280 if (!bucketNamePrefix.endsWith(DELIMITER)) { 281 bucketNamePrefix += DELIMITER; 282 } 283 } 284 285 // set up credentials 286 awsCredentialsProvider = S3Utils.getAWSCredentialsProvider(awsID, awsSecret, awsToken); 287 288 // set up client configuration 289 clientConfiguration = new ClientConfiguration(); 290 if (!proxyDisabled) { 291 if (isNotBlank(proxyHost)) { 292 clientConfiguration.setProxyHost(proxyHost); 293 } 294 if (isNotBlank(proxyPort)) { 295 clientConfiguration.setProxyPort(Integer.parseInt(proxyPort)); 296 } 297 if (isNotBlank(proxyLogin)) { 298 clientConfiguration.setProxyUsername(proxyLogin); 299 } 300 if (proxyPassword != null) { // could be blank 301 clientConfiguration.setProxyPassword(proxyPassword); 302 } 303 } 304 if (maxConnections > 0) { 305 clientConfiguration.setMaxConnections(maxConnections); 306 } 307 if (maxErrorRetry >= 0) { // 0 is allowed 308 clientConfiguration.setMaxErrorRetry(maxErrorRetry); 309 } 310 if (connectionTimeout >= 0) { // 0 is allowed 311 clientConfiguration.setConnectionTimeout(connectionTimeout); 312 } 313 if (socketTimeout >= 0) { // 0 is allowed 314 clientConfiguration.setSocketTimeout(socketTimeout); 315 } 316 317 // set up encryption 318 encryptionMaterials = null; 319 if (isNotBlank(keystoreFile)) { 320 boolean confok = true; 321 if (keystorePass == null) { // could be blank 322 log.error("Keystore password missing"); 323 confok = false; 324 } 325 if (isBlank(privkeyAlias)) { 326 log.error("Key alias missing"); 327 confok = false; 328 } 329 if (privkeyPass == null) { // could be blank 330 log.error("Key password missing"); 331 confok = false; 332 } 333 if (!confok) { 334 throw new RuntimeException("S3 Crypto configuration incomplete"); 335 } 336 try { 337 // Open keystore 338 File ksFile = new File(keystoreFile); 339 KeyStore keystore; 340 try (FileInputStream ksStream = new FileInputStream(ksFile)) { 341 keystore = KeyStore.getInstance(KeyStore.getDefaultType()); 342 keystore.load(ksStream, keystorePass.toCharArray()); 343 } 344 // Get keypair for alias 345 if (!keystore.isKeyEntry(privkeyAlias)) { 346 throw new RuntimeException("Alias " + privkeyAlias + " is missing or not a key alias"); 347 } 348 PrivateKey privKey = (PrivateKey) keystore.getKey(privkeyAlias, privkeyPass.toCharArray()); 349 Certificate cert = keystore.getCertificate(privkeyAlias); 350 PublicKey pubKey = cert.getPublicKey(); 351 KeyPair keypair = new KeyPair(pubKey, privKey); 352 // Get encryptionMaterials from keypair 353 encryptionMaterials = new EncryptionMaterials(keypair); 354 cryptoConfiguration = new CryptoConfiguration(); 355 } catch (IOException | GeneralSecurityException e) { 356 throw new RuntimeException("Could not read keystore: " + keystoreFile + ", alias: " + privkeyAlias, e); 357 } 358 } 359 isEncrypted = encryptionMaterials != null; 360 361 AmazonS3Builder<?, ?> s3Builder; 362 // Try to create bucket if it doesn't exist 363 if (!isEncrypted) { 364 s3Builder = AmazonS3ClientBuilder.standard() 365 .withCredentials(awsCredentialsProvider) 366 .withClientConfiguration(clientConfiguration); 367 368 } else { 369 s3Builder = AmazonS3EncryptionClientBuilder.standard() 370 .withClientConfiguration(clientConfiguration) 371 .withCryptoConfiguration(cryptoConfiguration) 372 .withCredentials(awsCredentialsProvider) 373 .withEncryptionMaterials(new StaticEncryptionMaterialsProvider( 374 encryptionMaterials)); 375 } 376 if (pathStyleAccessEnabled) { 377 log.debug("Path-style access enabled"); 378 s3Builder.enablePathStyleAccess(); 379 } 380 if (isNotBlank(endpoint)) { 381 s3Builder = s3Builder.withEndpointConfiguration(new EndpointConfiguration(endpoint, bucketRegion)); 382 } else { 383 s3Builder = s3Builder.withRegion(bucketRegion); 384 } 385 s3Builder.setAccelerateModeEnabled(accelerateModeEnabled); 386 387 amazonS3 = s3Builder.build(); 388 389 try { 390 if (!amazonS3.doesBucketExist(bucketName)) { 391 amazonS3.createBucket(bucketName); 392 amazonS3.setBucketAcl(bucketName, CannedAccessControlList.Private); 393 } 394 } catch (AmazonClientException e) { 395 throw new IOException(e); 396 } 397 398 // compat for NXP-17895, using "downloadfroms3", to be removed 399 // these two fields have already been initialized by the base class initialize() 400 // using standard property "directdownload" 401 String dd = getProperty(DIRECTDOWNLOAD_PROPERTY_COMPAT); 402 if (dd != null) { 403 directDownload = Boolean.parseBoolean(dd); 404 } 405 int dde = getIntProperty(DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT); 406 if (dde >= 0) { 407 directDownloadExpire = dde; 408 } 409 410 transferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build(); 411 abortOldUploads(); 412 } 413 414 @Override 415 public TransferManager getTransferManager() { 416 return transferManager; 417 } 418 419 protected void removeBinary(String digest) { 420 amazonS3.deleteObject(bucketName, bucketNamePrefix + digest); 421 } 422 423 @Override 424 protected String getSystemPropertyPrefix() { 425 return SYSTEM_PROPERTY_PREFIX; 426 } 427 428 @Override 429 protected BinaryGarbageCollector instantiateGarbageCollector() { 430 return new S3BinaryGarbageCollector(this); 431 } 432 433 @Override 434 public void removeBinaries(Collection<String> digests) { 435 digests.forEach(this::removeBinary); 436 } 437 438 protected static boolean isMissingKey(AmazonClientException e) { 439 if (e instanceof AmazonServiceException) { 440 AmazonServiceException ase = (AmazonServiceException) e; 441 return (ase.getStatusCode() == 404) || "NoSuchKey".equals(ase.getErrorCode()) 442 || "Not Found".equals(e.getMessage()); 443 } 444 return false; 445 } 446 447 /** @deprecated since 11.1, now unused */ 448 @Deprecated 449 public static boolean isMD5(String digest) { 450 return MD5_RE.matcher(digest).matches(); 451 } 452 453 /** 454 * Used in the healthCheck; the transferManager should be initialized and the bucket accessible 455 * 456 * @since 9.3 457 */ 458 public boolean canAccessBucket() { 459 return transferManager != null && transferManager.getAmazonS3Client().doesBucketExist(bucketName); 460 } 461 462 @Override 463 protected FileStorage getFileStorage() { 464 return new S3FileStorage(); 465 } 466 467 /** 468 * Gets the AWSCredentialsProvider. 469 * 470 * @since 10.2 471 */ 472 public AWSCredentialsProvider getAwsCredentialsProvider() { 473 return awsCredentialsProvider; 474 } 475 476 /** 477 * Gets AmazonS3. 478 * 479 * @since 10.2 480 */ 481 public AmazonS3 getAmazonS3() { 482 return amazonS3; 483 } 484 485 /** 486 * Gets the bucket name. 487 * 488 * @since 11.1 489 */ 490 public String getBucketName() { 491 return bucketName; 492 } 493 494 /** 495 * Gets the bucket prefix. 496 * 497 * @since 11.1 498 */ 499 public String getBucketPrefix() { 500 return bucketNamePrefix; 501 } 502 503 @Override 504 public String writeBlob(Blob blob) throws IOException { 505 // Attempt to do S3 Copy if the Source Blob provider is also S3 506 if (blob instanceof ManagedBlob) { 507 ManagedBlob managedBlob = (ManagedBlob) blob; 508 BlobProvider blobProvider = Framework.getService(BlobManager.class) 509 .getBlobProvider(managedBlob.getProviderId()); 510 if (blobProvider instanceof S3BinaryManager && blobProvider != this) { 511 // use S3 direct copy as the source blob provider is also S3 512 String key = copyBlob((S3BinaryManager) blobProvider, managedBlob.getKey()); 513 if (key != null) { 514 return key; 515 } 516 } 517 } 518 return super.writeBlob(blob); 519 } 520 521 /** 522 * Copies a blob. Returns {@code null} if the copy was not possible. 523 * 524 * @param sourceBlobProvider the source blob provider 525 * @param blobKey the source blob key 526 * @return the copied blob key, or {@code null} if the copy was not possible 527 * @since 10.1 528 */ 529 protected String copyBlob(S3BinaryManager sourceBlobProvider, String blobKey) throws IOException { 530 String digest = blobKey; 531 int colon = digest.indexOf(':'); 532 if (colon >= 0) { 533 digest = digest.substring(colon + 1); 534 } 535 String sourceBucketName = sourceBlobProvider.bucketName; 536 String sourceKey = sourceBlobProvider.bucketNamePrefix + digest; 537 String key = bucketNamePrefix + digest; 538 long t0 = 0; 539 if (log.isDebugEnabled()) { 540 t0 = System.currentTimeMillis(); 541 log.debug("copying blob " + sourceKey + " to " + key); 542 } 543 544 try { 545 amazonS3.getObjectMetadata(bucketName, key); 546 if (log.isDebugEnabled()) { 547 log.debug("blob " + key + " is already in S3"); 548 } 549 return digest; 550 } catch (AmazonServiceException e) { 551 if (!isMissingKey(e)) { 552 throw new IOException(e); 553 } 554 // object does not exist, just continue 555 } 556 557 // not already present -> copy the blob 558 ObjectMetadata sourceMetadata; 559 try { 560 sourceMetadata = amazonS3.getObjectMetadata(sourceBucketName, sourceKey); 561 } catch (AmazonServiceException e) { 562 throw new NuxeoException("Source blob does not exists: s3://" + sourceBucketName + "/" + sourceKey, e); 563 } 564 long length = sourceMetadata.getContentLength(); 565 try { 566 CopyObjectRequest copyObjectRequest = new CopyObjectRequest(sourceBucketName, sourceKey, bucketName, key); 567 if (useServerSideEncryption) { 568 if (isNotBlank(serverSideKMSKeyID)) { // TODO 569 log.warn("S3 copy not supported with KMS, falling back to regular copy"); 570 return null; 571 } 572 // SSE-S3 573 ObjectMetadata newObjectMetadata = new ObjectMetadata(); 574 newObjectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); 575 copyObjectRequest.setNewObjectMetadata(newObjectMetadata); 576 } 577 Copy copy = transferManager.copy(copyObjectRequest); 578 try { 579 copy.waitForCompletion(); 580 } catch (InterruptedException e) { 581 Thread.currentThread().interrupt(); 582 throw new NuxeoException(e); 583 } 584 if (log.isDebugEnabled()) { 585 long dtms = System.currentTimeMillis() - t0; 586 log.debug("copied blob " + sourceKey + " to " + key + " in " + dtms + "ms"); 587 } 588 return digest; 589 } catch (AmazonServiceException e) { 590 String message = "S3 copy not supported from s3://" + sourceBucketName + "/" + sourceKey + " to s3://" 591 + bucketName + "/" + key + " (" + length + " bytes)"; 592 log.warn(message + ", falling back to regular copy: " + e.getMessage()); 593 log.debug(message, e); 594 return null; 595 } 596 } 597 598 public class S3FileStorage implements FileStorage { 599 600 @Override 601 public void storeFile(String digest, File file) throws IOException { 602 long t0 = 0; 603 if (log.isDebugEnabled()) { 604 t0 = System.currentTimeMillis(); 605 log.debug("storing blob " + digest + " to S3"); 606 } 607 String key = bucketNamePrefix + digest; 608 try { 609 amazonS3.getObjectMetadata(bucketName, key); 610 if (log.isDebugEnabled()) { 611 log.debug("blob " + digest + " is already in S3"); 612 } 613 } catch (AmazonClientException e) { 614 if (!isMissingKey(e)) { 615 throw new IOException(e); 616 } 617 // not already present -> store the blob 618 PutObjectRequest request; 619 if (!isEncrypted) { 620 request = new PutObjectRequest(bucketName, key, file); 621 if (useServerSideEncryption) { 622 ObjectMetadata objectMetadata = new ObjectMetadata(); 623 if (isNotBlank(serverSideKMSKeyID)) { 624 SSEAwsKeyManagementParams keyManagementParams = new SSEAwsKeyManagementParams( 625 serverSideKMSKeyID); 626 request = request.withSSEAwsKeyManagementParams(keyManagementParams); 627 } else { 628 objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); 629 } 630 request.setMetadata(objectMetadata); 631 } 632 } else { 633 request = new EncryptedPutObjectRequest(bucketName, key, file); 634 } 635 Upload upload = transferManager.upload(request); 636 try { 637 upload.waitForUploadResult(); 638 } catch (AmazonClientException ee) { 639 throw new IOException(ee); 640 } catch (InterruptedException ee) { 641 Thread.currentThread().interrupt(); 642 throw new RuntimeException(ee); 643 } finally { 644 if (log.isDebugEnabled()) { 645 long dtms = System.currentTimeMillis() - t0; 646 log.debug("stored blob " + digest + " to S3 in " + dtms + "ms"); 647 } 648 } 649 } 650 } 651 652 @Override 653 public boolean fetchFile(String digest, File file) throws IOException { 654 long t0 = 0; 655 if (log.isDebugEnabled()) { 656 t0 = System.currentTimeMillis(); 657 log.debug("fetching blob " + digest + " from S3"); 658 } 659 try { 660 Download download = transferManager.download( 661 new GetObjectRequest(bucketName, bucketNamePrefix + digest), file); 662 download.waitForCompletion(); 663 if (isEncrypted) { 664 // can't easily check the decrypted digest 665 return true; 666 } 667 if (!digest.equals(download.getObjectMetadata().getETag())) { 668 // if our digest algorithm is not MD5 (so the ETag can never match), 669 // or in case of a multipart upload (where the ETag may not be the MD5), 670 // check manually the object integrity 671 // TODO this is costly and it should possible to deactivate it 672 String currentDigest = new DigestUtils(getDigestAlgorithm()).digestAsHex(file); 673 if (!currentDigest.equals(digest)) { 674 String msg = "Invalid S3 object digest, expected=" + digest + " actual=" + currentDigest; 675 log.error(msg); 676 throw new IOException(msg); 677 } 678 } 679 return true; 680 } catch (AmazonClientException e) { 681 if (!isMissingKey(e)) { 682 throw new IOException(e); 683 } 684 return false; 685 } catch (InterruptedException e) { 686 Thread.currentThread().interrupt(); 687 throw new RuntimeException(e); 688 } finally { 689 if (log.isDebugEnabled()) { 690 long dtms = System.currentTimeMillis() - t0; 691 log.debug("fetched blob " + digest + " from S3 in " + dtms + "ms"); 692 } 693 } 694 695 } 696 } 697 698 /** 699 * Garbage collector for S3 binaries that stores the marked (in use) binaries in memory. 700 */ 701 public static class S3BinaryGarbageCollector extends AbstractBinaryGarbageCollector<S3BinaryManager> { 702 703 protected S3BinaryGarbageCollector(S3BinaryManager binaryManager) { 704 super(binaryManager); 705 } 706 707 @Override 708 public String getId() { 709 return "s3:" + binaryManager.bucketName; 710 } 711 712 @Override 713 public Set<String> getUnmarkedBlobs() { 714 // list S3 objects in the bucket 715 // record those not marked 716 Set<String> unmarked = new HashSet<>(); 717 ObjectListing list = null; 718 do { 719 if (list == null) { 720 // use delimiter to avoid useless listing of objects in "subdirectories" 721 ListObjectsRequest listObjectsRequest = new ListObjectsRequest(binaryManager.bucketName, 722 binaryManager.bucketNamePrefix, null, DELIMITER, null); 723 list = binaryManager.amazonS3.listObjects(listObjectsRequest); 724 } else { 725 list = binaryManager.amazonS3.listNextBatchOfObjects(list); 726 } 727 int prefixLength = binaryManager.bucketNamePrefix.length(); 728 for (S3ObjectSummary summary : list.getObjectSummaries()) { 729 String digest = summary.getKey().substring(prefixLength); 730 if (!binaryManager.isValidDigest(digest)) { 731 // ignore files that cannot be digests, for safety 732 continue; 733 } 734 long length = summary.getSize(); 735 if (marked.contains(digest)) { 736 status.numBinaries++; 737 status.sizeBinaries += length; 738 } else { 739 status.numBinariesGC++; 740 status.sizeBinariesGC += length; 741 // record file to delete 742 unmarked.add(digest); 743 marked.remove(digest); // optimize memory 744 } 745 } 746 } while (list.isTruncated()); 747 748 return unmarked; 749 } 750 } 751 752 // ******************** BlobProvider ******************** 753 754 @Override 755 protected URI getRemoteUri(String digest, ManagedBlob blob, HttpServletRequest servletRequest) throws IOException { 756 String key = bucketNamePrefix + digest; 757 Date expiration = new Date(); 758 expiration.setTime(expiration.getTime() + directDownloadExpire * 1000); 759 GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucketName, key, HttpMethod.GET); 760 request.addRequestParameter("response-content-type", getContentTypeHeader(blob)); 761 request.addRequestParameter("response-content-disposition", getContentDispositionHeader(blob, null)); 762 request.setExpiration(expiration); 763 URL url = amazonS3.generatePresignedUrl(request); 764 try { 765 return url.toURI(); 766 } catch (URISyntaxException e) { 767 throw new IOException(e); 768 } 769 } 770 771}