001/* 002 * (C) Copyright 2011-2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Mathieu Guillaume 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.core.storage.sql; 021 022import static org.apache.commons.lang.StringUtils.isBlank; 023import static org.apache.commons.lang.StringUtils.isNotBlank; 024 025import java.io.File; 026import java.io.FileInputStream; 027import java.io.IOException; 028import java.net.URI; 029import java.net.URISyntaxException; 030import java.net.URL; 031import java.security.GeneralSecurityException; 032import java.security.KeyPair; 033import java.security.KeyStore; 034import java.security.PrivateKey; 035import java.security.PublicKey; 036import java.security.cert.Certificate; 037import java.util.Collection; 038import java.util.Date; 039import java.util.HashSet; 040import java.util.Set; 041import java.util.regex.Pattern; 042 043import javax.servlet.http.HttpServletRequest; 044 045import org.apache.commons.lang.StringUtils; 046import org.apache.commons.logging.Log; 047import org.apache.commons.logging.LogFactory; 048import org.nuxeo.common.Environment; 049import org.nuxeo.ecm.blob.AbstractBinaryGarbageCollector; 050import org.nuxeo.ecm.blob.AbstractCloudBinaryManager; 051import org.nuxeo.ecm.core.api.Blob; 052import org.nuxeo.ecm.core.blob.BlobManager.BlobInfo; 053import org.nuxeo.ecm.core.blob.ManagedBlob; 054import org.nuxeo.ecm.core.blob.binary.BinaryBlobProvider; 055import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 056import org.nuxeo.ecm.core.blob.binary.FileStorage; 057import org.nuxeo.ecm.core.model.Document; 058 059import com.amazonaws.AmazonClientException; 060import com.amazonaws.AmazonServiceException; 061import com.amazonaws.ClientConfiguration; 062import com.amazonaws.HttpMethod; 063import com.amazonaws.auth.AWSCredentialsProvider; 064import com.amazonaws.auth.InstanceProfileCredentialsProvider; 065import com.amazonaws.services.s3.AmazonS3; 066import com.amazonaws.services.s3.AmazonS3Client; 067import com.amazonaws.services.s3.AmazonS3EncryptionClient; 068import com.amazonaws.services.s3.internal.ServiceUtils; 069import com.amazonaws.services.s3.model.CannedAccessControlList; 070import com.amazonaws.services.s3.model.CryptoConfiguration; 071import com.amazonaws.services.s3.model.EncryptedPutObjectRequest; 072import com.amazonaws.services.s3.model.EncryptionMaterials; 073import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest; 074import com.amazonaws.services.s3.model.GetObjectRequest; 075import com.amazonaws.services.s3.model.ObjectListing; 076import com.amazonaws.services.s3.model.ObjectMetadata; 077import com.amazonaws.services.s3.model.PutObjectRequest; 078import com.amazonaws.services.s3.model.S3ObjectSummary; 079import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider; 080import com.amazonaws.services.s3.transfer.TransferManager; 081import com.amazonaws.services.s3.transfer.Upload; 082import com.amazonaws.services.s3.transfer.model.UploadResult; 083import com.google.common.base.MoreObjects; 084 085/** 086 * A Binary Manager that stores binaries as S3 BLOBs 087 * <p> 088 * The BLOBs are cached locally on first access for efficiency. 089 * <p> 090 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file 091 * if accessed before the stream. 092 */ 093public class S3BinaryManager extends AbstractCloudBinaryManager { 094 095 private static final String MD5 = "MD5"; // must be MD5 for Etag 096 097 @Override 098 protected String getDefaultDigestAlgorithm() { 099 return MD5; 100 } 101 102 private static final Log log = LogFactory.getLog(S3BinaryManager.class); 103 104 public static final String SYSTEM_PROPERTY_PREFIX = "nuxeo.s3storage"; 105 106 public static final String BUCKET_NAME_PROPERTY = "bucket"; 107 108 public static final String BUCKET_PREFIX_PROPERTY = "bucket_prefix"; 109 110 public static final String BUCKET_REGION_PROPERTY = "region"; 111 112 public static final String DEFAULT_BUCKET_REGION = null; // US East 113 114 public static final String AWS_ID_PROPERTY = "awsid"; 115 116 public static final String AWS_ID_ENV = "AWS_ACCESS_KEY_ID"; 117 118 public static final String AWS_SECRET_PROPERTY = "awssecret"; 119 120 public static final String AWS_SECRET_ENV = "AWS_SECRET_ACCESS_KEY"; 121 122 /** AWS ClientConfiguration default 50 */ 123 public static final String CONNECTION_MAX_PROPERTY = "connection.max"; 124 125 /** AWS ClientConfiguration default 3 (with exponential backoff) */ 126 public static final String CONNECTION_RETRY_PROPERTY = "connection.retry"; 127 128 /** AWS ClientConfiguration default 50*1000 = 50s */ 129 public static final String CONNECTION_TIMEOUT_PROPERTY = "connection.timeout"; 130 131 /** AWS ClientConfiguration default 50*1000 = 50s */ 132 public static final String SOCKET_TIMEOUT_PROPERTY = "socket.timeout"; 133 134 public static final String KEYSTORE_FILE_PROPERTY = "crypt.keystore.file"; 135 136 public static final String KEYSTORE_PASS_PROPERTY = "crypt.keystore.password"; 137 138 public static final String SERVERSIDE_ENCRYPTION_PROPERTY = "crypt.serverside"; 139 140 public static final String PRIVKEY_ALIAS_PROPERTY = "crypt.key.alias"; 141 142 public static final String PRIVKEY_PASS_PROPERTY = "crypt.key.password"; 143 144 public static final String ENDPOINT_PROPERTY = "endpoint"; 145 146 public static final String DIRECTDOWNLOAD_PROPERTY_COMPAT = "downloadfroms3"; 147 148 public static final String DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT = "downloadfroms3.expire"; 149 150 private static final Pattern MD5_RE = Pattern.compile("(.*/)?[0-9a-f]{32}"); 151 152 protected String bucketName; 153 154 protected String bucketNamePrefix; 155 156 protected AWSCredentialsProvider awsCredentialsProvider; 157 158 protected ClientConfiguration clientConfiguration; 159 160 protected EncryptionMaterials encryptionMaterials; 161 162 protected boolean isEncrypted; 163 164 protected CryptoConfiguration cryptoConfiguration; 165 166 protected boolean userServerSideEncryption; 167 168 protected AmazonS3 amazonS3; 169 170 protected TransferManager transferManager; 171 172 @Override 173 public void close() { 174 // this also shuts down the AmazonS3Client 175 transferManager.shutdownNow(); 176 super.close(); 177 } 178 179 /** 180 * Aborts uploads that crashed and are older than 1 day. 181 * 182 * @since 7.2 183 */ 184 protected void abortOldUploads() throws IOException { 185 int oneDay = 1000 * 60 * 60 * 24; 186 try { 187 transferManager.abortMultipartUploads(bucketName, new Date(System.currentTimeMillis() - oneDay)); 188 } catch (AmazonClientException e) { 189 throw new IOException("Failed to abort old uploads", e); 190 } 191 } 192 193 @Override 194 protected void setupCloudClient() throws IOException { 195 // Get settings from the configuration 196 bucketName = getProperty(BUCKET_NAME_PROPERTY); 197 bucketNamePrefix = MoreObjects.firstNonNull(getProperty(BUCKET_PREFIX_PROPERTY), StringUtils.EMPTY); 198 String bucketRegion = getProperty(BUCKET_REGION_PROPERTY); 199 if (isBlank(bucketRegion)) { 200 bucketRegion = DEFAULT_BUCKET_REGION; 201 } 202 String awsID = getProperty(AWS_ID_PROPERTY); 203 String awsSecret = getProperty(AWS_SECRET_PROPERTY); 204 205 String proxyHost = getProperty(Environment.NUXEO_HTTP_PROXY_HOST); 206 String proxyPort = getProperty(Environment.NUXEO_HTTP_PROXY_PORT); 207 String proxyLogin = getProperty(Environment.NUXEO_HTTP_PROXY_LOGIN); 208 String proxyPassword = getProperty(Environment.NUXEO_HTTP_PROXY_PASSWORD); 209 210 int maxConnections = getIntProperty(CONNECTION_MAX_PROPERTY); 211 int maxErrorRetry = getIntProperty(CONNECTION_RETRY_PROPERTY); 212 int connectionTimeout = getIntProperty(CONNECTION_TIMEOUT_PROPERTY); 213 int socketTimeout = getIntProperty(SOCKET_TIMEOUT_PROPERTY); 214 215 String keystoreFile = getProperty(KEYSTORE_FILE_PROPERTY); 216 String keystorePass = getProperty(KEYSTORE_PASS_PROPERTY); 217 String privkeyAlias = getProperty(PRIVKEY_ALIAS_PROPERTY); 218 String privkeyPass = getProperty(PRIVKEY_PASS_PROPERTY); 219 String endpoint = getProperty(ENDPOINT_PROPERTY); 220 String sseprop = getProperty(SERVERSIDE_ENCRYPTION_PROPERTY); 221 if (isNotBlank(sseprop)) { 222 userServerSideEncryption = Boolean.parseBoolean(sseprop); 223 } 224 225 // Fallback on default env keys for ID and secret 226 if (isBlank(awsID)) { 227 awsID = System.getenv(AWS_ID_ENV); 228 } 229 if (isBlank(awsSecret)) { 230 awsSecret = System.getenv(AWS_SECRET_ENV); 231 } 232 233 if (isBlank(bucketName)) { 234 throw new RuntimeException("Missing conf: " + BUCKET_NAME_PROPERTY); 235 } 236 237 if (!isBlank(bucketNamePrefix) && !bucketNamePrefix.endsWith("/")) { 238 log.warn(String.format("%s %s S3 bucket prefix should end by '/' " + ": added automatically.", 239 BUCKET_PREFIX_PROPERTY, bucketNamePrefix)); 240 bucketNamePrefix += "/"; 241 } 242 // set up credentials 243 if (isBlank(awsID) || isBlank(awsSecret)) { 244 awsCredentialsProvider = new InstanceProfileCredentialsProvider(); 245 try { 246 awsCredentialsProvider.getCredentials(); 247 } catch (AmazonClientException e) { 248 throw new RuntimeException("Missing AWS credentials and no instance role found"); 249 } 250 } else { 251 awsCredentialsProvider = new BasicAWSCredentialsProvider(awsID, awsSecret); 252 } 253 254 // set up client configuration 255 clientConfiguration = new ClientConfiguration(); 256 if (isNotBlank(proxyHost)) { 257 clientConfiguration.setProxyHost(proxyHost); 258 } 259 if (isNotBlank(proxyPort)) { 260 clientConfiguration.setProxyPort(Integer.parseInt(proxyPort)); 261 } 262 if (isNotBlank(proxyLogin)) { 263 clientConfiguration.setProxyUsername(proxyLogin); 264 } 265 if (proxyPassword != null) { // could be blank 266 clientConfiguration.setProxyPassword(proxyPassword); 267 } 268 if (maxConnections > 0) { 269 clientConfiguration.setMaxConnections(maxConnections); 270 } 271 if (maxErrorRetry >= 0) { // 0 is allowed 272 clientConfiguration.setMaxErrorRetry(maxErrorRetry); 273 } 274 if (connectionTimeout >= 0) { // 0 is allowed 275 clientConfiguration.setConnectionTimeout(connectionTimeout); 276 } 277 if (socketTimeout >= 0) { // 0 is allowed 278 clientConfiguration.setSocketTimeout(socketTimeout); 279 } 280 281 // set up encryption 282 encryptionMaterials = null; 283 if (isNotBlank(keystoreFile)) { 284 boolean confok = true; 285 if (keystorePass == null) { // could be blank 286 log.error("Keystore password missing"); 287 confok = false; 288 } 289 if (isBlank(privkeyAlias)) { 290 log.error("Key alias missing"); 291 confok = false; 292 } 293 if (privkeyPass == null) { // could be blank 294 log.error("Key password missing"); 295 confok = false; 296 } 297 if (!confok) { 298 throw new RuntimeException("S3 Crypto configuration incomplete"); 299 } 300 try { 301 // Open keystore 302 File ksFile = new File(keystoreFile); 303 FileInputStream ksStream = new FileInputStream(ksFile); 304 KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType()); 305 keystore.load(ksStream, keystorePass.toCharArray()); 306 ksStream.close(); 307 // Get keypair for alias 308 if (!keystore.isKeyEntry(privkeyAlias)) { 309 throw new RuntimeException("Alias " + privkeyAlias + " is missing or not a key alias"); 310 } 311 PrivateKey privKey = (PrivateKey) keystore.getKey(privkeyAlias, privkeyPass.toCharArray()); 312 Certificate cert = keystore.getCertificate(privkeyAlias); 313 PublicKey pubKey = cert.getPublicKey(); 314 KeyPair keypair = new KeyPair(pubKey, privKey); 315 // Get encryptionMaterials from keypair 316 encryptionMaterials = new EncryptionMaterials(keypair); 317 cryptoConfiguration = new CryptoConfiguration(); 318 } catch (IOException | GeneralSecurityException e) { 319 throw new RuntimeException("Could not read keystore: " + keystoreFile + ", alias: " + privkeyAlias, e); 320 } 321 } 322 isEncrypted = encryptionMaterials != null; 323 324 // Try to create bucket if it doesn't exist 325 if (!isEncrypted) { 326 amazonS3 = new AmazonS3Client(awsCredentialsProvider, clientConfiguration); 327 } else { 328 amazonS3 = new AmazonS3EncryptionClient(awsCredentialsProvider, new StaticEncryptionMaterialsProvider( 329 encryptionMaterials), clientConfiguration, cryptoConfiguration); 330 } 331 if (isNotBlank(endpoint)) { 332 amazonS3.setEndpoint(endpoint); 333 } 334 335 try { 336 if (!amazonS3.doesBucketExist(bucketName)) { 337 amazonS3.createBucket(bucketName, bucketRegion); 338 amazonS3.setBucketAcl(bucketName, CannedAccessControlList.Private); 339 } 340 } catch (AmazonClientException e) { 341 throw new IOException(e); 342 } 343 344 // compat for NXP-17895, using "downloadfroms3", to be removed 345 // these two fields have already been initialized by the base class initialize() 346 // using standard property "directdownload" 347 String dd = getProperty(DIRECTDOWNLOAD_PROPERTY_COMPAT); 348 if (dd != null) { 349 directDownload = Boolean.parseBoolean(dd); 350 } 351 int dde = getIntProperty(DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT); 352 if (dde >= 0) { 353 directDownloadExpire = dde; 354 } 355 356 transferManager = new TransferManager(amazonS3); 357 abortOldUploads(); 358 } 359 360 protected void removeBinary(String digest) { 361 amazonS3.deleteObject(bucketName, digest); 362 } 363 364 @Override 365 protected String getSystemPropertyPrefix() { 366 return SYSTEM_PROPERTY_PREFIX; 367 } 368 369 @Override 370 protected BinaryGarbageCollector instantiateGarbageCollector() { 371 return new S3BinaryGarbageCollector(this); 372 } 373 374 @Override 375 public void removeBinaries(Collection<String> digests) { 376 digests.forEach(this::removeBinary); 377 } 378 379 protected static boolean isMissingKey(AmazonClientException e) { 380 if (e instanceof AmazonServiceException) { 381 AmazonServiceException ase = (AmazonServiceException) e; 382 return (ase.getStatusCode() == 404) || "NoSuchKey".equals(ase.getErrorCode()) 383 || "Not Found".equals(e.getMessage()); 384 } 385 return false; 386 } 387 388 public static boolean isMD5(String digest) { 389 return MD5_RE.matcher(digest).matches(); 390 } 391 392 @Override 393 protected FileStorage getFileStorage() { 394 return new S3FileStorage(); 395 } 396 397 public class S3FileStorage implements FileStorage { 398 399 @Override 400 public void storeFile(String digest, File file) throws IOException { 401 long t0 = 0; 402 if (log.isDebugEnabled()) { 403 t0 = System.currentTimeMillis(); 404 log.debug("storing blob " + digest + " to S3"); 405 } 406 String etag; 407 String key = bucketNamePrefix + digest; 408 try { 409 ObjectMetadata metadata = amazonS3.getObjectMetadata(bucketName, key); 410 etag = metadata.getETag(); 411 if (log.isDebugEnabled()) { 412 log.debug("blob " + digest + " is already in S3"); 413 } 414 } catch (AmazonClientException e) { 415 if (!isMissingKey(e)) { 416 throw new IOException(e); 417 } 418 // not already present -> store the blob 419 PutObjectRequest request; 420 if (!isEncrypted) { 421 request = new PutObjectRequest(bucketName, key, file); 422 if (userServerSideEncryption) { 423 ObjectMetadata objectMetadata = new ObjectMetadata(); 424 objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); 425 request.setMetadata(objectMetadata); 426 } 427 } else { 428 request = new EncryptedPutObjectRequest(bucketName, key, file); 429 } 430 Upload upload = transferManager.upload(request); 431 try { 432 UploadResult result = upload.waitForUploadResult(); 433 etag = result.getETag(); 434 } catch (AmazonClientException ee) { 435 throw new IOException(ee); 436 } catch (InterruptedException ee) { 437 // reset interrupted status 438 Thread.currentThread().interrupt(); 439 // continue interrupt 440 throw new RuntimeException(ee); 441 } finally { 442 if (log.isDebugEnabled()) { 443 long dtms = System.currentTimeMillis() - t0; 444 log.debug("stored blob " + digest + " to S3 in " + dtms + "ms"); 445 } 446 } 447 } 448 // check transfer went ok 449 if (!isEncrypted && !etag.equals(digest) && !ServiceUtils.isMultipartUploadETag(etag)) { 450 // When the blob is not encrypted by S3, the MD5 remotely 451 // computed by S3 and passed as a Etag should match the locally 452 // computed MD5 digest. 453 // This check cannot be done when encryption is enabled unless 454 // we could replicate that encryption locally just for that 455 // purpose which would add further load and complexity on the 456 // client. 457 throw new IOException("Invalid ETag in S3, ETag=" + etag + " digest=" + digest); 458 } 459 } 460 461 @Override 462 public boolean fetchFile(String digest, File file) throws IOException { 463 long t0 = 0; 464 if (log.isDebugEnabled()) { 465 t0 = System.currentTimeMillis(); 466 log.debug("fetching blob " + digest + " from S3"); 467 } 468 try { 469 470 ObjectMetadata metadata = amazonS3.getObject( 471 new GetObjectRequest(bucketName, bucketNamePrefix + digest), file); 472 // check ETag 473 String etag = metadata.getETag(); 474 if (!isEncrypted && !etag.equals(digest) && !ServiceUtils.isMultipartUploadETag(etag)) { 475 log.error("Invalid ETag in S3, ETag=" + etag + " digest=" + digest); 476 return false; 477 } 478 return true; 479 } catch (AmazonClientException e) { 480 if (!isMissingKey(e)) { 481 throw new IOException(e); 482 } 483 return false; 484 } finally { 485 if (log.isDebugEnabled()) { 486 long dtms = System.currentTimeMillis() - t0; 487 log.debug("fetched blob " + digest + " from S3 in " + dtms + "ms"); 488 } 489 } 490 491 } 492 493 @Override 494 public Long fetchLength(String digest) throws IOException { 495 long t0 = 0; 496 if (log.isDebugEnabled()) { 497 t0 = System.currentTimeMillis(); 498 log.debug("fetching blob length " + digest + " from S3"); 499 } 500 try { 501 ObjectMetadata metadata = amazonS3.getObjectMetadata(bucketName, bucketNamePrefix + digest); 502 // check ETag 503 String etag = metadata.getETag(); 504 if (!isEncrypted && !etag.equals(digest) && !ServiceUtils.isMultipartUploadETag(etag)) { 505 log.error("Invalid ETag in S3, ETag=" + etag + " digest=" + digest); 506 return null; 507 } 508 return Long.valueOf(metadata.getContentLength()); 509 } catch (AmazonClientException e) { 510 if (!isMissingKey(e)) { 511 throw new IOException(e); 512 } 513 return null; 514 } finally { 515 if (log.isDebugEnabled()) { 516 long dtms = System.currentTimeMillis() - t0; 517 log.debug("fetched blob length " + digest + " from S3 in " + dtms + "ms"); 518 } 519 } 520 } 521 } 522 523 /** 524 * Garbage collector for S3 binaries that stores the marked (in use) binaries in memory. 525 */ 526 public static class S3BinaryGarbageCollector extends AbstractBinaryGarbageCollector<S3BinaryManager> { 527 528 protected S3BinaryGarbageCollector(S3BinaryManager binaryManager) { 529 super(binaryManager); 530 } 531 532 @Override 533 public String getId() { 534 return "s3:" + binaryManager.bucketName; 535 } 536 537 @Override 538 public Set<String> getUnmarkedBlobs() { 539 // list S3 objects in the bucket 540 // record those not marked 541 Set<String> unmarked = new HashSet<>(); 542 ObjectListing list = null; 543 do { 544 if (list == null) { 545 list = binaryManager.amazonS3.listObjects(binaryManager.bucketName, binaryManager.bucketNamePrefix); 546 } else { 547 list = binaryManager.amazonS3.listNextBatchOfObjects(list); 548 } 549 for (S3ObjectSummary summary : list.getObjectSummaries()) { 550 String digest = summary.getKey(); 551 if (!isMD5(digest)) { 552 // ignore files that cannot be MD5 digests for 553 // safety 554 continue; 555 } 556 long length = summary.getSize(); 557 if (marked.contains(digest)) { 558 status.numBinaries++; 559 status.sizeBinaries += length; 560 } else { 561 status.numBinariesGC++; 562 status.sizeBinariesGC += length; 563 // record file to delete 564 unmarked.add(digest); 565 marked.remove(digest); // optimize memory 566 } 567 } 568 } while (list.isTruncated()); 569 570 return unmarked; 571 } 572 } 573 574 // ******************** BlobProvider ******************** 575 576 @Override 577 public Blob readBlob(BlobInfo blobInfo) throws IOException { 578 // just delegate to avoid copy/pasting code 579 return new BinaryBlobProvider(this).readBlob(blobInfo); 580 } 581 582 @Override 583 public String writeBlob(Blob blob, Document doc) throws IOException { 584 // just delegate to avoid copy/pasting code 585 return new BinaryBlobProvider(this).writeBlob(blob, doc); 586 } 587 588 @Override 589 protected boolean isDirectDownload() { 590 return directDownload; 591 } 592 593 @Override 594 protected URI getRemoteUri(String digest, ManagedBlob blob, HttpServletRequest servletRequest) throws IOException { 595 String key = bucketNamePrefix + digest; 596 Date expiration = new Date(); 597 expiration.setTime(expiration.getTime() + directDownloadExpire * 1000); 598 GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucketName, key, HttpMethod.GET); 599 request.addRequestParameter("response-content-type", getContentTypeHeader(blob)); 600 request.addRequestParameter("response-content-disposition", getContentDispositionHeader(blob, servletRequest)); 601 request.setExpiration(expiration); 602 URL url = amazonS3.generatePresignedUrl(request); 603 try { 604 return url.toURI(); 605 } catch (URISyntaxException e) { 606 throw new IOException(e); 607 } 608 } 609 610}