001/* 002 * (C) Copyright 2011-2015 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Mathieu Guillaume 016 * Florent Guillaume 017 */ 018package org.nuxeo.ecm.core.storage.sql; 019 020import static org.apache.commons.lang.StringUtils.isBlank; 021import static org.apache.commons.lang.StringUtils.isNotBlank; 022 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.IOException; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.net.URL; 029import java.security.GeneralSecurityException; 030import java.security.KeyPair; 031import java.security.KeyStore; 032import java.security.PrivateKey; 033import java.security.PublicKey; 034import java.security.cert.Certificate; 035import java.util.Collection; 036import java.util.Date; 037import java.util.HashSet; 038import java.util.Set; 039import java.util.regex.Pattern; 040 041import javax.servlet.http.HttpServletRequest; 042 043import org.apache.commons.lang.StringUtils; 044import org.apache.commons.logging.Log; 045import org.apache.commons.logging.LogFactory; 046import org.nuxeo.common.Environment; 047import org.nuxeo.ecm.blob.AbstractBinaryGarbageCollector; 048import org.nuxeo.ecm.blob.AbstractCloudBinaryManager; 049import org.nuxeo.ecm.core.api.Blob; 050import org.nuxeo.ecm.core.blob.BlobManager.BlobInfo; 051import org.nuxeo.ecm.core.blob.ManagedBlob; 052import org.nuxeo.ecm.core.blob.binary.BinaryBlobProvider; 053import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 054import org.nuxeo.ecm.core.blob.binary.FileStorage; 055import org.nuxeo.ecm.core.model.Document; 056import org.nuxeo.runtime.api.Framework; 057 058import com.amazonaws.AmazonClientException; 059import com.amazonaws.AmazonServiceException; 060import com.amazonaws.ClientConfiguration; 061import com.amazonaws.HttpMethod; 062import com.amazonaws.auth.AWSCredentialsProvider; 063import com.amazonaws.auth.InstanceProfileCredentialsProvider; 064import com.amazonaws.services.s3.AmazonS3; 065import com.amazonaws.services.s3.AmazonS3Client; 066import com.amazonaws.services.s3.AmazonS3EncryptionClient; 067import com.amazonaws.services.s3.internal.ServiceUtils; 068import com.amazonaws.services.s3.model.CannedAccessControlList; 069import com.amazonaws.services.s3.model.CryptoConfiguration; 070import com.amazonaws.services.s3.model.EncryptedPutObjectRequest; 071import com.amazonaws.services.s3.model.EncryptionMaterials; 072import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest; 073import com.amazonaws.services.s3.model.GetObjectRequest; 074import com.amazonaws.services.s3.model.ObjectListing; 075import com.amazonaws.services.s3.model.ObjectMetadata; 076import com.amazonaws.services.s3.model.PutObjectRequest; 077import com.amazonaws.services.s3.model.S3ObjectSummary; 078import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider; 079import com.amazonaws.services.s3.transfer.TransferManager; 080import com.amazonaws.services.s3.transfer.Upload; 081import com.amazonaws.services.s3.transfer.model.UploadResult; 082import com.google.common.base.MoreObjects; 083 084/** 085 * A Binary Manager that stores binaries as S3 BLOBs 086 * <p> 087 * The BLOBs are cached locally on first access for efficiency. 088 * <p> 089 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file 090 * if accessed before the stream. 091 */ 092public class S3BinaryManager extends AbstractCloudBinaryManager { 093 094 private static final String MD5 = "MD5"; // must be MD5 for Etag 095 096 @Override 097 protected String getDefaultDigestAlgorithm() { 098 return MD5; 099 } 100 101 private static final Log log = LogFactory.getLog(S3BinaryManager.class); 102 103 public static final String PROPERTY_PREFIX = "nuxeo.s3storage"; 104 105 public static final String BUCKET_NAME_KEY = "nuxeo.s3storage.bucket"; 106 107 public static final String BUCKET_PREFIX_KEY = "nuxeo.s3storage.bucket.prefix"; 108 109 public static final String BUCKET_REGION_KEY = "nuxeo.s3storage.region"; 110 111 public static final String DEFAULT_BUCKET_REGION = null; // US East 112 113 public static final String AWS_ID_KEY = "nuxeo.s3storage.awsid"; 114 115 public static final String AWS_ID_ENV_KEY = "AWS_ACCESS_KEY_ID"; 116 117 public static final String AWS_SECRET_KEY = "nuxeo.s3storage.awssecret"; 118 119 public static final String AWS_SECRET_ENV_KEY = "AWS_SECRET_ACCESS_KEY"; 120 121 public static final String CACHE_SIZE_KEY = "nuxeo.s3storage.cachesize"; 122 123 /** AWS ClientConfiguration default 50 */ 124 public static final String CONNECTION_MAX_KEY = "nuxeo.s3storage.connection.max"; 125 126 /** AWS ClientConfiguration default 3 (with exponential backoff) */ 127 public static final String CONNECTION_RETRY_KEY = "nuxeo.s3storage.connection.retry"; 128 129 /** AWS ClientConfiguration default 50*1000 = 50s */ 130 public static final String CONNECTION_TIMEOUT_KEY = "nuxeo.s3storage.connection.timeout"; 131 132 /** AWS ClientConfiguration default 50*1000 = 50s */ 133 public static final String SOCKET_TIMEOUT_KEY = "nuxeo.s3storage.socket.timeout"; 134 135 public static final String KEYSTORE_FILE_KEY = "nuxeo.s3storage.crypt.keystore.file"; 136 137 public static final String KEYSTORE_PASS_KEY = "nuxeo.s3storage.crypt.keystore.password"; 138 139 public static final String PRIVKEY_ALIAS_KEY = "nuxeo.s3storage.crypt.key.alias"; 140 141 public static final String PRIVKEY_PASS_KEY = "nuxeo.s3storage.crypt.key.password"; 142 143 public static final String ENDPOINT_KEY = "nuxeo.s3storage.endpoint"; 144 145 public static final String DIRECTDOWNLOAD_KEY = "nuxeo.s3storage.downloadfroms3"; 146 147 public static final String DIRECTDOWNLOAD_EXPIRE_KEY = "nuxeo.s3storage.downloadfroms3.expire"; 148 149 private static final Pattern MD5_RE = Pattern.compile("(.*/)?[0-9a-f]{32}"); 150 151 protected String bucketName; 152 153 protected String bucketNamePrefix; 154 155 protected AWSCredentialsProvider awsCredentialsProvider; 156 157 protected ClientConfiguration clientConfiguration; 158 159 protected EncryptionMaterials encryptionMaterials; 160 161 protected boolean isEncrypted; 162 163 protected CryptoConfiguration cryptoConfiguration; 164 165 protected AmazonS3 amazonS3; 166 167 protected TransferManager transferManager; 168 169 @Override 170 public void close() { 171 // this also shuts down the AmazonS3Client 172 transferManager.shutdownNow(); 173 super.close(); 174 } 175 176 /** 177 * Aborts uploads that crashed and are older than 1 day. 178 * 179 * @since 7.2 180 */ 181 protected void abortOldUploads() throws IOException { 182 int oneDay = 1000 * 60 * 60 * 24; 183 try { 184 transferManager.abortMultipartUploads(bucketName, new Date(System.currentTimeMillis() - oneDay)); 185 } catch (AmazonClientException e) { 186 throw new IOException("Failed to abort old uploads", e); 187 } 188 } 189 190 @Override 191 protected void setupCloudClient() throws IOException { 192 // Get settings from the configuration 193 // TODO parse properties too 194 bucketName = Framework.getProperty(BUCKET_NAME_KEY); 195 bucketNamePrefix = MoreObjects.firstNonNull(Framework.getProperty(BUCKET_PREFIX_KEY), StringUtils.EMPTY); 196 String bucketRegion = Framework.getProperty(BUCKET_REGION_KEY); 197 if (isBlank(bucketRegion)) { 198 bucketRegion = DEFAULT_BUCKET_REGION; 199 } 200 String awsID = Framework.getProperty(AWS_ID_KEY); 201 String awsSecret = Framework.getProperty(AWS_SECRET_KEY); 202 203 String proxyHost = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_HOST); 204 String proxyPort = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PORT); 205 String proxyLogin = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_LOGIN); 206 String proxyPassword = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PASSWORD); 207 208 int maxConnections = getIntFrameworkProperty(CONNECTION_MAX_KEY); 209 int maxErrorRetry = getIntFrameworkProperty(CONNECTION_RETRY_KEY); 210 int connectionTimeout = getIntFrameworkProperty(CONNECTION_TIMEOUT_KEY); 211 int socketTimeout = getIntFrameworkProperty(SOCKET_TIMEOUT_KEY); 212 213 String keystoreFile = Framework.getProperty(KEYSTORE_FILE_KEY); 214 String keystorePass = Framework.getProperty(KEYSTORE_PASS_KEY); 215 String privkeyAlias = Framework.getProperty(PRIVKEY_ALIAS_KEY); 216 String privkeyPass = Framework.getProperty(PRIVKEY_PASS_KEY); 217 String endpoint = Framework.getProperty(ENDPOINT_KEY); 218 219 // Fallback on default env keys for ID and secret 220 if (isBlank(awsID)) { 221 awsID = System.getenv(AWS_ID_ENV_KEY); 222 } 223 if (isBlank(awsSecret)) { 224 awsSecret = System.getenv(AWS_SECRET_ENV_KEY); 225 } 226 227 if (isBlank(bucketName)) { 228 throw new RuntimeException("Missing conf: " + BUCKET_NAME_KEY); 229 } 230 231 if (!isBlank(bucketNamePrefix) && !bucketNamePrefix.endsWith("/")) { 232 log.warn(String.format("%s %s S3 bucket prefix should end by '/' " + ": added automatically.", 233 BUCKET_PREFIX_KEY, bucketNamePrefix)); 234 bucketNamePrefix += "/"; 235 } 236 // set up credentials 237 if (isBlank(awsID) || isBlank(awsSecret)) { 238 awsCredentialsProvider = new InstanceProfileCredentialsProvider(); 239 try { 240 awsCredentialsProvider.getCredentials(); 241 } catch (AmazonClientException e) { 242 throw new RuntimeException("Missing AWS credentials and no instance role found"); 243 } 244 } else { 245 awsCredentialsProvider = new BasicAWSCredentialsProvider(awsID, awsSecret); 246 } 247 248 // set up client configuration 249 clientConfiguration = new ClientConfiguration(); 250 if (isNotBlank(proxyHost)) { 251 clientConfiguration.setProxyHost(proxyHost); 252 } 253 if (isNotBlank(proxyPort)) { 254 clientConfiguration.setProxyPort(Integer.parseInt(proxyPort)); 255 } 256 if (isNotBlank(proxyLogin)) { 257 clientConfiguration.setProxyUsername(proxyLogin); 258 } 259 if (proxyPassword != null) { // could be blank 260 clientConfiguration.setProxyPassword(proxyPassword); 261 } 262 if (maxConnections > 0) { 263 clientConfiguration.setMaxConnections(maxConnections); 264 } 265 if (maxErrorRetry >= 0) { // 0 is allowed 266 clientConfiguration.setMaxErrorRetry(maxErrorRetry); 267 } 268 if (connectionTimeout >= 0) { // 0 is allowed 269 clientConfiguration.setConnectionTimeout(connectionTimeout); 270 } 271 if (socketTimeout >= 0) { // 0 is allowed 272 clientConfiguration.setSocketTimeout(socketTimeout); 273 } 274 275 // set up encryption 276 encryptionMaterials = null; 277 if (isNotBlank(keystoreFile)) { 278 boolean confok = true; 279 if (keystorePass == null) { // could be blank 280 log.error("Keystore password missing"); 281 confok = false; 282 } 283 if (isBlank(privkeyAlias)) { 284 log.error("Key alias missing"); 285 confok = false; 286 } 287 if (privkeyPass == null) { // could be blank 288 log.error("Key password missing"); 289 confok = false; 290 } 291 if (!confok) { 292 throw new RuntimeException("S3 Crypto configuration incomplete"); 293 } 294 try { 295 // Open keystore 296 File ksFile = new File(keystoreFile); 297 FileInputStream ksStream = new FileInputStream(ksFile); 298 KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType()); 299 keystore.load(ksStream, keystorePass.toCharArray()); 300 ksStream.close(); 301 // Get keypair for alias 302 if (!keystore.isKeyEntry(privkeyAlias)) { 303 throw new RuntimeException("Alias " + privkeyAlias + " is missing or not a key alias"); 304 } 305 PrivateKey privKey = (PrivateKey) keystore.getKey(privkeyAlias, privkeyPass.toCharArray()); 306 Certificate cert = keystore.getCertificate(privkeyAlias); 307 PublicKey pubKey = cert.getPublicKey(); 308 KeyPair keypair = new KeyPair(pubKey, privKey); 309 // Get encryptionMaterials from keypair 310 encryptionMaterials = new EncryptionMaterials(keypair); 311 cryptoConfiguration = new CryptoConfiguration(); 312 } catch (IOException | GeneralSecurityException e) { 313 throw new RuntimeException("Could not read keystore: " + keystoreFile + ", alias: " + privkeyAlias, e); 314 } 315 } 316 isEncrypted = encryptionMaterials != null; 317 318 // Try to create bucket if it doesn't exist 319 if (!isEncrypted) { 320 amazonS3 = new AmazonS3Client(awsCredentialsProvider, clientConfiguration); 321 } else { 322 amazonS3 = new AmazonS3EncryptionClient(awsCredentialsProvider, new StaticEncryptionMaterialsProvider( 323 encryptionMaterials), clientConfiguration, cryptoConfiguration); 324 } 325 if (isNotBlank(endpoint)) { 326 amazonS3.setEndpoint(endpoint); 327 } 328 329 try { 330 if (!amazonS3.doesBucketExist(bucketName)) { 331 amazonS3.createBucket(bucketName, bucketRegion); 332 amazonS3.setBucketAcl(bucketName, CannedAccessControlList.Private); 333 } 334 } catch (AmazonClientException e) { 335 throw new IOException(e); 336 } 337 338 directDownload = Boolean.parseBoolean(Framework.getProperty(DIRECTDOWNLOAD_KEY, DEFAULT_DIRECTDOWNLOAD)); 339 directDownloadExpire = getIntFrameworkProperty(DIRECTDOWNLOAD_EXPIRE_KEY); 340 if (directDownloadExpire < 0) { 341 directDownloadExpire = DEFAULT_DIRECTDOWNLOAD_EXPIRE; 342 } 343 344 transferManager = new TransferManager(amazonS3); 345 abortOldUploads(); 346 } 347 348 protected void removeBinary(String digest) { 349 amazonS3.deleteObject(bucketName, digest); 350 } 351 352 @Override 353 protected String getPropertyPrefix() { 354 return PROPERTY_PREFIX; 355 } 356 357 @Override 358 protected BinaryGarbageCollector instantiateGarbageCollector() { 359 return new S3BinaryGarbageCollector(this); 360 } 361 362 @Override 363 public void removeBinaries(Collection<String> digests) { 364 digests.forEach(this::removeBinary); 365 } 366 367 protected static boolean isMissingKey(AmazonClientException e) { 368 if (e instanceof AmazonServiceException) { 369 AmazonServiceException ase = (AmazonServiceException) e; 370 return (ase.getStatusCode() == 404) || "NoSuchKey".equals(ase.getErrorCode()) 371 || "Not Found".equals(e.getMessage()); 372 } 373 return false; 374 } 375 376 public static boolean isMD5(String digest) { 377 return MD5_RE.matcher(digest).matches(); 378 } 379 380 @Override 381 protected FileStorage getFileStorage() { 382 return new S3FileStorage(); 383 } 384 385 public class S3FileStorage implements FileStorage { 386 387 @Override 388 public void storeFile(String digest, File file) throws IOException { 389 long t0 = 0; 390 if (log.isDebugEnabled()) { 391 t0 = System.currentTimeMillis(); 392 log.debug("storing blob " + digest + " to S3"); 393 } 394 String etag; 395 String key = bucketNamePrefix + digest; 396 try { 397 ObjectMetadata metadata = amazonS3.getObjectMetadata(bucketName, key); 398 etag = metadata.getETag(); 399 if (log.isDebugEnabled()) { 400 log.debug("blob " + digest + " is already in S3"); 401 } 402 } catch (AmazonClientException e) { 403 if (!isMissingKey(e)) { 404 throw new IOException(e); 405 } 406 // not already present -> store the blob 407 PutObjectRequest request; 408 if (!isEncrypted) { 409 request = new PutObjectRequest(bucketName, key, file); 410 } else { 411 request = new EncryptedPutObjectRequest(bucketName, key, file); 412 } 413 Upload upload = transferManager.upload(request); 414 try { 415 UploadResult result = upload.waitForUploadResult(); 416 etag = result.getETag(); 417 } catch (AmazonClientException ee) { 418 throw new IOException(ee); 419 } catch (InterruptedException ee) { 420 // reset interrupted status 421 Thread.currentThread().interrupt(); 422 // continue interrupt 423 throw new RuntimeException(ee); 424 } finally { 425 if (log.isDebugEnabled()) { 426 long dtms = System.currentTimeMillis() - t0; 427 log.debug("stored blob " + digest + " to S3 in " + dtms + "ms"); 428 } 429 } 430 } 431 // check transfer went ok 432 if (!isEncrypted && !etag.equals(digest) && !ServiceUtils.isMultipartUploadETag(etag)) { 433 // When the blob is not encrypted by S3, the MD5 remotely 434 // computed by S3 and passed as a Etag should match the locally 435 // computed MD5 digest. 436 // This check cannot be done when encryption is enabled unless 437 // we could replicate that encryption locally just for that 438 // purpose which would add further load and complexity on the 439 // client. 440 throw new IOException("Invalid ETag in S3, ETag=" + etag + " digest=" + digest); 441 } 442 } 443 444 @Override 445 public boolean fetchFile(String digest, File file) throws IOException { 446 long t0 = 0; 447 if (log.isDebugEnabled()) { 448 t0 = System.currentTimeMillis(); 449 log.debug("fetching blob " + digest + " from S3"); 450 } 451 try { 452 453 ObjectMetadata metadata = amazonS3.getObject( 454 new GetObjectRequest(bucketName, bucketNamePrefix + digest), file); 455 // check ETag 456 String etag = metadata.getETag(); 457 if (!isEncrypted && !etag.equals(digest) && !ServiceUtils.isMultipartUploadETag(etag)) { 458 log.error("Invalid ETag in S3, ETag=" + etag + " digest=" + digest); 459 return false; 460 } 461 return true; 462 } catch (AmazonClientException e) { 463 if (!isMissingKey(e)) { 464 throw new IOException(e); 465 } 466 return false; 467 } finally { 468 if (log.isDebugEnabled()) { 469 long dtms = System.currentTimeMillis() - t0; 470 log.debug("fetched blob " + digest + " from S3 in " + dtms + "ms"); 471 } 472 } 473 474 } 475 476 @Override 477 public Long fetchLength(String digest) throws IOException { 478 long t0 = 0; 479 if (log.isDebugEnabled()) { 480 t0 = System.currentTimeMillis(); 481 log.debug("fetching blob length " + digest + " from S3"); 482 } 483 try { 484 ObjectMetadata metadata = amazonS3.getObjectMetadata(bucketName, bucketNamePrefix + digest); 485 // check ETag 486 String etag = metadata.getETag(); 487 if (!isEncrypted && !etag.equals(digest) && !ServiceUtils.isMultipartUploadETag(etag)) { 488 log.error("Invalid ETag in S3, ETag=" + etag + " digest=" + digest); 489 return null; 490 } 491 return Long.valueOf(metadata.getContentLength()); 492 } catch (AmazonClientException e) { 493 if (!isMissingKey(e)) { 494 throw new IOException(e); 495 } 496 return null; 497 } finally { 498 if (log.isDebugEnabled()) { 499 long dtms = System.currentTimeMillis() - t0; 500 log.debug("fetched blob length " + digest + " from S3 in " + dtms + "ms"); 501 } 502 } 503 } 504 } 505 506 /** 507 * Garbage collector for S3 binaries that stores the marked (in use) binaries in memory. 508 */ 509 public static class S3BinaryGarbageCollector extends AbstractBinaryGarbageCollector<S3BinaryManager> { 510 511 protected S3BinaryGarbageCollector(S3BinaryManager binaryManager) { 512 super(binaryManager); 513 } 514 515 @Override 516 public String getId() { 517 return "s3:" + binaryManager.bucketName; 518 } 519 520 @Override 521 public Set<String> getUnmarkedBlobs() { 522 // list S3 objects in the bucket 523 // record those not marked 524 Set<String> unmarked = new HashSet<>(); 525 ObjectListing list = null; 526 do { 527 if (list == null) { 528 list = binaryManager.amazonS3.listObjects(binaryManager.bucketName, binaryManager.bucketNamePrefix); 529 } else { 530 list = binaryManager.amazonS3.listNextBatchOfObjects(list); 531 } 532 for (S3ObjectSummary summary : list.getObjectSummaries()) { 533 String digest = summary.getKey(); 534 if (!isMD5(digest)) { 535 // ignore files that cannot be MD5 digests for 536 // safety 537 continue; 538 } 539 long length = summary.getSize(); 540 if (marked.contains(digest)) { 541 status.numBinaries++; 542 status.sizeBinaries += length; 543 } else { 544 status.numBinariesGC++; 545 status.sizeBinariesGC += length; 546 // record file to delete 547 unmarked.add(digest); 548 marked.remove(digest); // optimize memory 549 } 550 } 551 } while (list.isTruncated()); 552 553 return unmarked; 554 } 555 } 556 557 // ******************** BlobProvider ******************** 558 559 @Override 560 public Blob readBlob(BlobInfo blobInfo) throws IOException { 561 // just delegate to avoid copy/pasting code 562 return new BinaryBlobProvider(this).readBlob(blobInfo); 563 } 564 565 @Override 566 public String writeBlob(Blob blob, Document doc) throws IOException { 567 // just delegate to avoid copy/pasting code 568 return new BinaryBlobProvider(this).writeBlob(blob, doc); 569 } 570 571 @Override 572 protected boolean isDirectDownload() { 573 return directDownload; 574 } 575 576 @Override 577 protected URI getRemoteUri(String digest, ManagedBlob blob, HttpServletRequest servletRequest) throws IOException { 578 String key = bucketNamePrefix + digest; 579 Date expiration = new Date(); 580 expiration.setTime(expiration.getTime() + directDownloadExpire * 1000); 581 GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucketName, key, HttpMethod.GET); 582 request.addRequestParameter("response-content-type", getContentTypeHeader(blob)); 583 request.addRequestParameter("response-content-disposition", getContentDispositionHeader(blob, servletRequest)); 584 request.setExpiration(expiration); 585 URL url = amazonS3.generatePresignedUrl(request); 586 try { 587 return url.toURI(); 588 } catch (URISyntaxException e) { 589 throw new IOException(e); 590 } 591 } 592 593}