001/* 002 * (C) Copyright 2019 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.blob.s3; 020 021import static org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM; 022import static org.apache.commons.lang3.StringUtils.isNotBlank; 023import static org.nuxeo.ecm.core.blob.BlobProviderDescriptor.ALLOW_BYTE_RANGE; 024import static org.nuxeo.ecm.core.blob.KeyStrategy.VER_SEP; 025 026import java.io.File; 027import java.io.IOException; 028import java.io.InputStream; 029import java.nio.file.Files; 030import java.nio.file.Path; 031import java.time.Duration; 032import java.util.Calendar; 033import java.util.Collections; 034import java.util.Date; 035import java.util.HashSet; 036import java.util.Map; 037import java.util.Set; 038 039import org.apache.commons.codec.digest.DigestUtils; 040import org.apache.commons.lang3.mutable.MutableObject; 041import org.apache.logging.log4j.LogManager; 042import org.apache.logging.log4j.Logger; 043import org.nuxeo.common.utils.RFC2231; 044import org.nuxeo.ecm.core.api.Blob; 045import org.nuxeo.ecm.core.api.NuxeoException; 046import org.nuxeo.ecm.core.api.NuxeoPrincipal; 047import org.nuxeo.ecm.core.api.SystemPrincipal; 048import org.nuxeo.ecm.core.blob.AbstractBlobGarbageCollector; 049import org.nuxeo.ecm.core.blob.AbstractBlobStore; 050import org.nuxeo.ecm.core.blob.BlobContext; 051import org.nuxeo.ecm.core.blob.BlobManager; 052import org.nuxeo.ecm.core.blob.BlobProvider; 053import org.nuxeo.ecm.core.blob.BlobStore; 054import org.nuxeo.ecm.core.blob.BlobUpdateContext; 055import org.nuxeo.ecm.core.blob.BlobWriteContext; 056import org.nuxeo.ecm.core.blob.ByteRange; 057import org.nuxeo.ecm.core.blob.KeyStrategy; 058import org.nuxeo.ecm.core.blob.ManagedBlob; 059import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 060import org.nuxeo.ecm.core.io.download.DownloadHelper; 061import org.nuxeo.runtime.api.Framework; 062 063import com.amazonaws.AmazonServiceException; 064import com.amazonaws.SdkBaseException; 065import com.amazonaws.services.s3.AmazonS3; 066import com.amazonaws.services.s3.model.BucketVersioningConfiguration; 067import com.amazonaws.services.s3.model.CopyObjectRequest; 068import com.amazonaws.services.s3.model.EncryptedPutObjectRequest; 069import com.amazonaws.services.s3.model.GetObjectRequest; 070import com.amazonaws.services.s3.model.ListObjectsRequest; 071import com.amazonaws.services.s3.model.ListVersionsRequest; 072import com.amazonaws.services.s3.model.ObjectListing; 073import com.amazonaws.services.s3.model.ObjectLockLegalHold; 074import com.amazonaws.services.s3.model.ObjectLockLegalHoldStatus; 075import com.amazonaws.services.s3.model.ObjectLockRetention; 076import com.amazonaws.services.s3.model.ObjectMetadata; 077import com.amazonaws.services.s3.model.PutObjectRequest; 078import com.amazonaws.services.s3.model.RestoreObjectRequest; 079import com.amazonaws.services.s3.model.S3ObjectSummary; 080import com.amazonaws.services.s3.model.S3VersionSummary; 081import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams; 082import com.amazonaws.services.s3.model.SetObjectLegalHoldRequest; 083import com.amazonaws.services.s3.model.SetObjectRetentionRequest; 084import com.amazonaws.services.s3.model.VersionListing; 085import com.amazonaws.services.s3.transfer.Copy; 086import com.amazonaws.services.s3.transfer.Download; 087import com.amazonaws.services.s3.transfer.Upload; 088import com.amazonaws.services.s3.transfer.model.UploadResult; 089 090/** 091 * Blob storage in S3. 092 * 093 * @since 11.1 094 */ 095public class S3BlobStore extends AbstractBlobStore { 096 097 private static final Logger log = LogManager.getLogger(S3BlobStore.class); 098 099 // x-amz-meta-username header 100 protected static final String USER_METADATA_USERNAME = "username"; 101 102 protected final S3BlobStoreConfiguration config; 103 104 protected final AmazonS3 amazonS3; 105 106 protected final String bucketName; 107 108 protected final String bucketPrefix; 109 110 protected final boolean allowByteRange; 111 112 // note, we may choose to not use versions even in a versioned bucket 113 // if we want the bucket to record and keep old versions for us 114 /** If true, include the object version in the key. */ 115 protected final boolean useVersion; 116 117 protected final BinaryGarbageCollector gc; 118 119 public S3BlobStore(String name, S3BlobStoreConfiguration config, KeyStrategy keyStrategy) { 120 super(name, keyStrategy); 121 this.config = config; 122 amazonS3 = config.amazonS3; 123 bucketName = config.bucketName; 124 bucketPrefix = config.bucketPrefix; 125 allowByteRange = config.getBooleanProperty(ALLOW_BYTE_RANGE); 126 useVersion = !keyStrategy.useDeDuplication() && isBucketVersioningEnabled(); 127 gc = new S3BlobGarbageCollector(); 128 } 129 130 public S3BlobStore getS3BinaryManager() { 131 return S3BlobStore.this; 132 } 133 134 protected static boolean isMissingKey(AmazonServiceException e) { 135 return e.getStatusCode() == 404 || "NoSuchKey".equals(e.getErrorCode()) || "Not Found".equals(e.getMessage()); 136 } 137 138 protected static boolean isNotImplemented(AmazonServiceException e) { 139 return e.getStatusCode() == 501 || "NotImplemented".equals(e.getErrorCode()); 140 } 141 142 protected boolean isBucketVersioningEnabled() { 143 try { 144 BucketVersioningConfiguration v = amazonS3.getBucketVersioningConfiguration(bucketName); 145 // if versioning is suspended, created objects won't have versions 146 return v.getStatus().equals(BucketVersioningConfiguration.ENABLED); 147 } catch (AmazonServiceException e) { 148 if (isNotImplemented(e)) { 149 // minio does not implement versioning 150 log.warn("Versioning not implemented for bucket: {}: {}", () -> bucketName, e::getMessage); 151 log.debug(e, e); 152 return false; 153 } 154 throw e; 155 } 156 } 157 158 @Override 159 public boolean hasVersioning() { 160 return useVersion; 161 } 162 163 @Override 164 public String writeBlob(BlobWriteContext blobWriteContext) throws IOException { 165 166 // detect copy from another S3 blob provider, to use direct S3-level copy 167 BlobContext blobContext = blobWriteContext.blobContext; 168 Blob blob = blobContext.blob; 169 String copiedKey = copyBlob(blob); 170 if (copiedKey != null) { 171 return copiedKey; 172 } 173 174 Path file; 175 String fileTraceSource; 176 Path tmp = null; 177 try { 178 Path blobWriteContextFile = blobWriteContext.getFile(); 179 if (blobWriteContextFile != null) { 180 // we have a file, assume that the caller already observed the write 181 file = blobWriteContextFile; 182 fileTraceSource = "Nuxeo"; 183 } else { 184 // no transfer to a file was done yet (no caching) 185 // we may be able to use the blob's underlying file, if not pure streaming 186 File blobFile = blob.getFile(); 187 if (blobFile != null) { 188 // otherwise use blob file directly 189 if (blobWriteContext.writeObserver != null) { 190 // but we must still run the writes through the write observer 191 transfer(blobWriteContext, NULL_OUTPUT_STREAM); 192 } 193 file = blobFile.toPath(); 194 fileTraceSource = "Nuxeo"; 195 } else { 196 // we must transfer the blob stream to a tmp file 197 tmp = Files.createTempFile("bin_", ".tmp"); 198 logTrace(null, "->", "tmp", "write"); 199 logTrace("hnote right: " + tmp.getFileName()); 200 transfer(blobWriteContext, tmp); 201 file = tmp; 202 fileTraceSource = "tmp"; 203 } 204 } 205 String key = blobWriteContext.getKey(); // may depend on write observer, for example for digests 206 if (key == null) { 207 // should never happen unless an invalid WriteObserver is used in new code 208 throw new NuxeoException("Missing key"); 209 } else if (key.indexOf(VER_SEP) >= 0) { 210 // should never happen unless AWS S3 changes their key format 211 throw new NuxeoException( 212 "Invalid key '" + key + "', it contains the version separator '" + VER_SEP + "'"); 213 } 214 String versionId = writeFile(key, file, blobContext, fileTraceSource); 215 return versionId == null ? key : key + VER_SEP + versionId; 216 } finally { 217 if (tmp != null) { 218 try { 219 logTrace("tmp", "-->", "tmp", "delete"); 220 logTrace("hnote right: " + tmp.getFileName()); 221 Files.delete(tmp); 222 } catch (IOException e) { 223 log.warn(e, e); 224 } 225 } 226 } 227 } 228 229 /** Writes a file with the given key and returns its version id. */ 230 protected String writeFile(String key, Path file, BlobContext blobContext, String fileTraceSource) 231 throws IOException { 232 String bucketKey = bucketPrefix + key; 233 long t0 = 0; 234 if (log.isDebugEnabled()) { 235 t0 = System.currentTimeMillis(); 236 log.debug("Writing s3://" + bucketName + "/" + bucketKey); 237 } 238 239 if (getKeyStrategy().useDeDuplication() && exists(bucketKey)) { 240 return null; // no key version used with deduplication 241 } 242 243 PutObjectRequest putObjectRequest; 244 ObjectMetadata objectMetadata = new ObjectMetadata(); 245 if (config.useClientSideEncryption) { 246 // client-side encryption 247 putObjectRequest = new EncryptedPutObjectRequest(bucketName, bucketKey, file.toFile()); 248 } else { 249 // server-side encryption 250 putObjectRequest = new PutObjectRequest(bucketName, bucketKey, file.toFile()); 251 if (config.useServerSideEncryption) { 252 if (isNotBlank(config.serverSideKMSKeyID)) { 253 // SSE-KMS 254 SSEAwsKeyManagementParams params = new SSEAwsKeyManagementParams(config.serverSideKMSKeyID); 255 putObjectRequest.setSSEAwsKeyManagementParams(params); 256 } else { 257 // SSE-S3 258 objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); 259 } 260 // TODO SSE-C 261 } 262 } 263 setMetadata(objectMetadata, blobContext); 264 putObjectRequest.setMetadata(objectMetadata); 265 logTrace(fileTraceSource, "->", null, "write " + Files.size(file) + " bytes"); 266 logTrace("hnote right: " + bucketKey); 267 Upload upload = config.transferManager.upload(putObjectRequest); 268 try { 269 UploadResult uploadResult = upload.waitForUploadResult(); 270 // if we don't want to use versions, ignore them even though the bucket may be versioned 271 String versionId = useVersion ? uploadResult.getVersionId() : null; 272 if (log.isDebugEnabled()) { 273 long dtms = System.currentTimeMillis() - t0; 274 log.debug("Wrote s3://" + bucketName + "/" + bucketKey + " in " + dtms + "ms"); 275 } 276 if (versionId != null) { 277 logTrace("<--", "v=" + versionId); 278 } 279 return versionId; 280 } catch (InterruptedException e) { 281 Thread.currentThread().interrupt(); 282 throw new NuxeoException(e); 283 } catch (SdkBaseException e) { 284 // catch SdkBaseException and not just AmazonServiceException 285 throw new NuxeoException("Failed to write blob: " + key, e); 286 } 287 } 288 289 protected void setMetadata(ObjectMetadata objectMetadata, BlobContext blobContext) { 290 if (blobContext != null) { 291 Blob blob = blobContext.blob; 292 String filename = blob.getFilename(); 293 if (filename != null) { 294 String contentDisposition = RFC2231.encodeContentDisposition(filename, false, null); 295 objectMetadata.setContentDisposition(contentDisposition); 296 } 297 String contentType = DownloadHelper.getContentTypeHeader(blob); 298 objectMetadata.setContentType(contentType); 299 } 300 if (config.metadataAddUsername) { 301 NuxeoPrincipal principal = NuxeoPrincipal.getCurrent(); 302 if (principal != null && !(principal instanceof SystemPrincipal)) { 303 String username = principal.getActingUser(); 304 if (username != null) { 305 Map<String, String> userMetadata = Collections.singletonMap(USER_METADATA_USERNAME, username); 306 objectMetadata.setUserMetadata(userMetadata); 307 } 308 } 309 } 310 } 311 312 @Override 313 public OptionalOrUnknown<Path> getFile(String key) { 314 return OptionalOrUnknown.unknown(); 315 } 316 317 @Override 318 public OptionalOrUnknown<InputStream> getStream(String key) throws IOException { 319 return OptionalOrUnknown.unknown(); 320 } 321 322 protected boolean exists(String bucketKey) { 323 try { 324 amazonS3.getObjectMetadata(bucketName, bucketKey); 325 if (log.isDebugEnabled()) { 326 log.debug("Blob s3://" + bucketName + "/" + bucketKey + " already exists"); 327 } 328 logTrace("<--", "exists"); 329 logTrace("hnote right: " + bucketKey); 330 return true; 331 } catch (AmazonServiceException e) { 332 if (isMissingKey(e)) { 333 logTrace("<--", "missing"); 334 logTrace("hnote right: " + bucketKey); 335 return false; 336 } 337 throw e; 338 } 339 } 340 341 // used for tests 342 protected void clearBucket() { 343 logTrace("group ClearBucket"); 344 ObjectListing list = null; 345 long n = 0; 346 do { 347 if (list == null) { 348 logTrace("->", "listObjects"); 349 list = amazonS3.listObjects(bucketName); 350 } else { 351 list = amazonS3.listNextBatchOfObjects(list); 352 } 353 for (S3ObjectSummary summary : list.getObjectSummaries()) { 354 amazonS3.deleteObject(bucketName, summary.getKey()); 355 n++; 356 } 357 } while (list.isTruncated()); 358 if (n > 0) { 359 logTrace("loop " + n + " objects"); 360 logTrace("->", "deleteObject"); 361 logTrace("end"); 362 } 363 VersionListing vlist = null; 364 long vn = 0; 365 do { 366 if (vlist == null) { 367 logTrace("->", "listVersions"); 368 vlist = amazonS3.listVersions(new ListVersionsRequest().withBucketName(bucketName)); 369 } else { 370 vlist = amazonS3.listNextBatchOfVersions(vlist); 371 372 } 373 for (S3VersionSummary vsummary : vlist.getVersionSummaries()) { 374 amazonS3.deleteVersion(bucketName, vsummary.getKey(), vsummary.getVersionId()); 375 vn++; 376 } 377 } while (vlist.isTruncated()); 378 if (vn > 0) { 379 logTrace("loop " + vn + " versions"); 380 logTrace("->", "deleteVersion"); 381 logTrace("end"); 382 } 383 logTrace("end"); 384 } 385 386 @Override 387 public boolean readBlob(String key, Path dest) throws IOException { 388 ByteRange byteRange; 389 if (allowByteRange) { 390 MutableObject<String> keyHolder = new MutableObject<>(key); 391 byteRange = getByteRangeFromKey(keyHolder); 392 key = keyHolder.getValue(); 393 } else { 394 byteRange = null; 395 } 396 String objectKey; 397 String versionId; 398 int seppos; 399 if (useVersion && (seppos = key.indexOf(VER_SEP)) > 0) { 400 objectKey = key.substring(0, seppos); 401 versionId = key.substring(seppos + 1); 402 } else { 403 objectKey = key; 404 versionId = null; 405 } 406 String bucketKey = bucketPrefix + objectKey; 407 long t0 = 0; 408 if (log.isDebugEnabled()) { 409 t0 = System.currentTimeMillis(); 410 log.debug("Reading s3://" + bucketName + "/" + bucketKey); 411 } 412 try { 413 GetObjectRequest getObjectRequest = new GetObjectRequest(bucketName, bucketKey, versionId); 414 if (byteRange != null) { 415 getObjectRequest.setRange(byteRange.getStart(), byteRange.getEnd()); 416 } 417 Download download = config.transferManager.download(getObjectRequest, dest.toFile()); 418 download.waitForCompletion(); 419 logTrace("<-", "read " + Files.size(dest) + " bytes"); 420 logTrace("hnote right: " + bucketKey + (versionId == null ? "" : " v=" + versionId)); 421 if (log.isDebugEnabled()) { 422 long dtms = System.currentTimeMillis() - t0; 423 log.debug("Read s3://" + bucketName + "/" + bucketKey + " in " + dtms + "ms"); 424 } 425 if (config.useClientSideEncryption) { 426 // can't efficiently check the decrypted digest 427 return true; 428 } 429 if (config.useServerSideEncryption && isNotBlank(config.serverSideKMSKeyID)) { 430 // can't get digest from key when using KMS 431 return true; 432 } 433 if (byteRange != null) { 434 // can't check digest if we have a byte range 435 return true; 436 } 437 String expectedDigest = getKeyStrategy().getDigestFromKey(objectKey); 438 if (expectedDigest != null) { 439 checkDigest(expectedDigest, download, dest); 440 } 441 // else nothing to compare to, key is not digest-based 442 return true; 443 } catch (AmazonServiceException e) { 444 if (isMissingKey(e)) { 445 logTrace("<--", "missing"); 446 logTrace("hnote right: " + bucketKey + (versionId == null ? "" : " v=" + versionId)); 447 if (log.isDebugEnabled()) { 448 log.debug("Blob s3://" + bucketName + "/" + bucketKey + " does not exist"); 449 } 450 return false; 451 } 452 throw new IOException(e); 453 } catch (InterruptedException e) { 454 Thread.currentThread().interrupt(); 455 throw new NuxeoException(e); 456 } 457 } 458 459 protected void checkDigest(String expectedDigest, Download download, Path file) throws IOException { 460 if (!expectedDigest.equals(download.getObjectMetadata().getETag())) { 461 // if our digest algorithm is not MD5 (so the ETag can never match), 462 // or in case of a multipart upload (where the ETag may not be the MD5), 463 // check manually the object integrity 464 // TODO this is costly and it should possible to deactivate it 465 String digest = new DigestUtils(config.digestConfiguration.digestAlgorithm).digestAsHex(file.toFile()); 466 if (!digest.equals(expectedDigest)) { 467 String msg = "Invalid S3 object digest, expected=" + expectedDigest + " actual=" + digest; 468 log.warn(msg); 469 throw new IOException(msg); 470 } 471 } 472 } 473 474 /** 475 * Copies the blob as a direct S3 operation, if possible. 476 * 477 * @return the key, or {@code null} if no copy was done 478 */ 479 protected String copyBlob(Blob blob) throws IOException { 480 if (!(blob instanceof ManagedBlob)) { 481 // not a managed blob 482 return null; 483 } 484 ManagedBlob managedBlob = (ManagedBlob) blob; 485 BlobProvider blobProvider = Framework.getService(BlobManager.class) 486 .getBlobProvider(managedBlob.getProviderId()); 487 if (!getKeyStrategy().useDeDuplication()) { 488 // key is not digest-based, so we don't know the destination key 489 return null; 490 } 491 if (!(blobProvider instanceof S3BlobProvider)) { 492 // not an S3 blob 493 return null; 494 } 495 S3BlobStore sourceStore = (S3BlobStore) ((S3BlobProvider) blobProvider).store.unwrap(); 496 if (!sourceStore.getKeyStrategy().equals(getKeyStrategy())) { 497 // not the same digest 498 return null; 499 } 500 // use S3-level copy as the source blob provider is also S3 501 String sourceKey = stripBlobKeyPrefix(managedBlob.getKey()); 502 String key = sourceKey; // because same key strategy 503 boolean found = copyBlob(key, sourceStore, sourceKey, false); 504 if (!found) { 505 throw new IOException("Cannot find source blob: " + sourceKey); 506 } 507 return key; 508 } 509 510 @Override 511 public boolean copyBlobIsOptimized(BlobStore sourceStore) { 512 return sourceStore instanceof S3BlobStore; 513 } 514 515 @Override 516 public boolean copyBlob(String key, BlobStore sourceStore, String sourceKey, boolean atomicMove) 517 throws IOException { 518 BlobStore unwrappedSourceStore = sourceStore.unwrap(); 519 if (unwrappedSourceStore instanceof S3BlobStore) { 520 // attempt direct S3-level copy 521 S3BlobStore sourceS3BlobStore = (S3BlobStore) unwrappedSourceStore; 522 try { 523 boolean copied = copyBlob(key, sourceS3BlobStore, sourceKey, atomicMove); 524 if (copied) { 525 return true; 526 } 527 } catch (AmazonServiceException e) { 528 if (isMissingKey(e)) { 529 logTrace("<--", "missing"); 530 // source not found 531 return false; 532 } 533 throw new IOException(e); 534 } 535 // fall through if not copied 536 } 537 return copyBlobGeneric(key, sourceStore, sourceKey, atomicMove); 538 } 539 540 /** 541 * @return {@code false} if generic copy is needed 542 * @throws AmazonServiceException if the source is missing 543 */ 544 protected boolean copyBlob(String key, S3BlobStore sourceBlobStore, String sourceKey, boolean move) 545 throws AmazonServiceException { // NOSONAR 546 String sourceBucketName = sourceBlobStore.bucketName; 547 String sourceBucketKey = sourceBlobStore.bucketPrefix + sourceKey; 548 String bucketKey = bucketPrefix + key; 549 550 long t0 = 0; 551 if (log.isDebugEnabled()) { 552 t0 = System.currentTimeMillis(); 553 log.debug("Copying s3://" + sourceBucketName + "/" + sourceBucketKey + " to s3://" + bucketName + "/" 554 + bucketKey); 555 } 556 557 if (getKeyStrategy().useDeDuplication() && exists(bucketKey)) { 558 return true; 559 } 560 561 // copy the blob 562 logTrace("->", "getObjectMetadata"); 563 ObjectMetadata sourceMetadata = amazonS3.getObjectMetadata(sourceBucketName, sourceBucketKey); 564 // don't catch AmazonServiceException if missing, caller will do it 565 long length = sourceMetadata.getContentLength(); 566 logTrace("<-", length + " bytes"); 567 try { 568 569 copyBlob(sourceBlobStore.config, sourceBucketKey, config, bucketKey, move); 570 571 if (log.isDebugEnabled()) { 572 long dtms = System.currentTimeMillis() - t0; 573 log.debug("Copied s3://" + sourceBucketName + "/" + sourceBucketKey + " to s3://" + bucketName + "/" 574 + bucketKey + " in " + dtms + "ms"); 575 } 576 return true; 577 } catch (AmazonServiceException e) { 578 logTrace("<--", "ERROR"); 579 String message = "Direct copy failed from s3://" + sourceBucketName + "/" + sourceBucketKey + " to s3://" 580 + bucketName + "/" + bucketKey + " (" + length + " bytes)"; 581 log.warn(message + ", falling back to slow copy: " + e.getMessage()); 582 log.debug(message, e); 583 return false; 584 } 585 } 586 587 protected void copyBlob(S3BlobStoreConfiguration sourceConfig, String sourceKey, 588 S3BlobStoreConfiguration destinationConfig, String destinationKey, boolean move) { 589 CopyObjectRequest copyObjectRequest = new CopyObjectRequest(sourceConfig.bucketName, sourceKey, 590 destinationConfig.bucketName, destinationKey); 591 if (destinationConfig.useServerSideEncryption) { 592 // server-side encryption 593 if (isNotBlank(destinationConfig.serverSideKMSKeyID)) { 594 // SSE-KMS 595 SSEAwsKeyManagementParams params = new SSEAwsKeyManagementParams(destinationConfig.serverSideKMSKeyID); 596 copyObjectRequest.setSSEAwsKeyManagementParams(params); 597 } else { 598 // SSE-S3 599 ObjectMetadata newObjectMetadata = new ObjectMetadata(); 600 newObjectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); 601 // CopyCallable.populateMetadataWithEncryptionParams will get the rest from the source 602 copyObjectRequest.setNewObjectMetadata(newObjectMetadata); 603 } 604 // TODO SSE-C 605 } 606 logTrace("->", "copyObject"); 607 logTrace("hnote right: " + sourceKey + " to " + destinationKey); 608 Copy copy = destinationConfig.transferManager.copy(copyObjectRequest, sourceConfig.amazonS3, null); 609 try { 610 copy.waitForCompletion(); 611 } catch (InterruptedException e) { 612 Thread.currentThread().interrupt(); 613 throw new NuxeoException(e); 614 } 615 logTrace("<--", "copied"); 616 if (move) { 617 logTrace("->", "deleteObject"); 618 logTrace("hnote right: " + sourceKey); 619 amazonS3.deleteObject(sourceConfig.bucketName, sourceKey); 620 } 621 } 622 623 protected boolean copyBlobGeneric(String key, BlobStore sourceStore, String sourceKey, boolean atomicMove) 624 throws IOException { 625 Path tmp = null; 626 try { 627 OptionalOrUnknown<Path> fileOpt = sourceStore.getFile(sourceKey); 628 Path file; 629 String fileTraceSource; 630 if (fileOpt.isPresent()) { 631 file = fileOpt.get(); 632 fileTraceSource = sourceStore.getName(); 633 } else { 634 // no local file available, read from source 635 tmp = Files.createTempFile("bin_", ".tmp"); 636 logTrace(null, "->", "tmp", "write"); 637 logTrace("hnote right: " + tmp.getFileName()); 638 boolean found = sourceStore.readBlob(sourceKey, tmp); 639 if (!found) { 640 return false; 641 } 642 file = tmp; 643 fileTraceSource = "tmp"; 644 } 645 String versionId = writeFile(key, file, null, fileTraceSource); // always atomic 646 if (versionId != null) { 647 throw new NuxeoException("Cannot copy blob if store has versioning"); 648 } 649 if (atomicMove) { 650 sourceStore.deleteBlob(sourceKey); 651 } 652 return true; 653 } finally { 654 if (tmp != null) { 655 try { 656 logTrace("tmp", "-->", "tmp", "delete"); 657 logTrace("hnote right: " + tmp.getFileName()); 658 Files.delete(tmp); 659 } catch (IOException e) { 660 log.warn(e, e); 661 } 662 } 663 } 664 } 665 666 @Override 667 public void writeBlobProperties(BlobUpdateContext blobUpdateContext) throws IOException { 668 String key = blobUpdateContext.key; 669 String objectKey; 670 String versionId; 671 int seppos = key.indexOf(VER_SEP); 672 if (seppos < 0) { 673 objectKey = key; 674 versionId = null; 675 } else { 676 objectKey = key.substring(0, seppos); 677 versionId = key.substring(seppos + 1); 678 } 679 String bucketKey = bucketPrefix + objectKey; 680 try { 681 if (blobUpdateContext.updateRetainUntil != null) { 682 if (versionId == null) { 683 throw new IOException("Cannot set retention on non-versioned blob"); 684 } 685 Calendar retainUntil = blobUpdateContext.updateRetainUntil.retainUntil; 686 Date retainUntilDate = retainUntil == null ? null : retainUntil.getTime(); 687 ObjectLockRetention retention = new ObjectLockRetention(); 688 retention.withMode(config.retentionMode) // 689 .withRetainUntilDate(retainUntilDate); 690 SetObjectRetentionRequest request = new SetObjectRetentionRequest(); 691 request.withBucketName(bucketName) // 692 .withKey(bucketKey) 693 .withVersionId(versionId) 694 .withRetention(retention); 695 logTrace("->", "setObjectRetention"); 696 logTrace("hnote right: " + bucketKey + "v=" + versionId); 697 logTrace("rnote right: " + (retainUntil == null ? "null" : retainUntil.toInstant().toString())); 698 amazonS3.setObjectRetention(request); 699 } 700 if (blobUpdateContext.updateLegalHold != null) { 701 if (versionId == null) { 702 throw new IOException("Cannot set legal hold on non-versioned blob"); 703 } 704 boolean hold = blobUpdateContext.updateLegalHold.hold; 705 ObjectLockLegalHoldStatus status = hold ? ObjectLockLegalHoldStatus.ON : ObjectLockLegalHoldStatus.OFF; 706 ObjectLockLegalHold legalHold = new ObjectLockLegalHold().withStatus(status); 707 SetObjectLegalHoldRequest request = new SetObjectLegalHoldRequest(); 708 request.withBucketName(bucketName) // 709 .withKey(bucketKey) 710 .withVersionId(versionId) 711 .withLegalHold(legalHold); 712 logTrace("->", "setObjectLegalHold"); 713 logTrace("hnote right: " + bucketKey + "v=" + versionId); 714 logTrace("rnote right: " + status.toString()); 715 amazonS3.setObjectLegalHold(request); 716 } 717 if (blobUpdateContext.restoreForDuration != null) { 718 Duration duration = blobUpdateContext.restoreForDuration.duration; 719 // round up duration to days 720 int days = (int) duration.plusDays(1).minusSeconds(1).toDays(); 721 RestoreObjectRequest request = new RestoreObjectRequest(bucketName, bucketKey, days).withVersionId( 722 versionId); 723 amazonS3.restoreObjectV2(request); 724 } 725 } catch (AmazonServiceException e) { 726 if (isMissingKey(e)) { 727 logTrace("<--", "missing"); 728 if (log.isDebugEnabled()) { 729 log.debug("Blob s3://" + bucketName + "/" + bucketKey + " does not exist"); 730 } 731 } 732 throw new IOException(e); 733 } 734 } 735 736 @Override 737 public void deleteBlob(String key) { 738 String objectKey; 739 String versionId; 740 int seppos = key.indexOf(VER_SEP); 741 if (seppos < 0) { 742 objectKey = key; 743 versionId = null; 744 } else { 745 objectKey = key.substring(0, seppos); 746 versionId = key.substring(seppos + 1); 747 } 748 String bucketKey = bucketPrefix + objectKey; 749 try { 750 if (versionId == null) { 751 logTrace("->", "deleteObject"); 752 logTrace("hnote right: " + bucketKey); 753 amazonS3.deleteObject(bucketName, bucketKey); 754 } else { 755 logTrace("->", "deleteVersion"); 756 logTrace("hnote right: " + bucketKey + " v=" + versionId); 757 amazonS3.deleteVersion(bucketName, bucketKey, versionId); 758 } 759 } catch (AmazonServiceException e) { 760 if (isMissingKey(e)) { 761 logTrace("<--", "missing"); 762 } else { 763 log.warn(e, e); 764 } 765 } 766 } 767 768 @Override 769 public BinaryGarbageCollector getBinaryGarbageCollector() { 770 return gc; 771 } 772 773 /** 774 * Garbage collector for S3 binaries that stores the marked (in use) binaries in memory. 775 */ 776 public class S3BlobGarbageCollector extends AbstractBlobGarbageCollector { 777 778 @Override 779 public String getId() { 780 return "s3:" + bucketName + "/" + bucketPrefix; 781 } 782 783 @Override 784 public Set<String> getUnmarkedBlobsAndUpdateStatus() { 785 // list S3 objects in the bucket 786 // record those not marked 787 boolean useDeDuplication = keyStrategy.useDeDuplication(); 788 Set<String> unmarked = new HashSet<>(); 789 ObjectListing list = null; 790 int prefixLength = bucketPrefix.length(); 791 logTrace("->", "listObjects"); 792 do { 793 if (list == null) { 794 // use delimiter to avoid useless listing of objects in "subdirectories" 795 ListObjectsRequest listObjectsRequest = new ListObjectsRequest(bucketName, bucketPrefix, null, 796 S3BlobStoreConfiguration.DELIMITER, null); 797 list = amazonS3.listObjects(listObjectsRequest); 798 } else { 799 list = amazonS3.listNextBatchOfObjects(list); 800 } 801 for (S3ObjectSummary summary : list.getObjectSummaries()) { 802 String key = summary.getKey().substring(prefixLength); 803 if (useDeDuplication) { 804 if (!config.digestConfiguration.isValidDigest(key)) { 805 // ignore files that cannot be digests, for safety 806 continue; 807 } 808 } 809 long length = summary.getSize(); 810 if (marked.contains(key)) { 811 status.numBinaries++; 812 status.sizeBinaries += length; 813 } else { 814 status.numBinariesGC++; 815 status.sizeBinariesGC += length; 816 // record file to delete 817 unmarked.add(key); 818 } 819 } 820 } while (list.isTruncated()); 821 logTrace("<--", (status.numBinaries + status.numBinariesGC) + " objects"); 822 return unmarked; 823 } 824 825 @Override 826 public void removeBlobs(Set<String> keys) { 827 keys.forEach(S3BlobStore.this::deleteBlob); 828 } 829 } 830 831}