001/* 002 * (C) Copyright 2011-2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Mathieu Guillaume 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.core.storage.sql; 021 022import static org.apache.commons.lang.StringUtils.isBlank; 023import static org.apache.commons.lang.StringUtils.isNotBlank; 024 025import java.io.File; 026import java.io.FileInputStream; 027import java.io.IOException; 028import java.net.URI; 029import java.net.URISyntaxException; 030import java.net.URL; 031import java.security.GeneralSecurityException; 032import java.security.KeyPair; 033import java.security.KeyStore; 034import java.security.PrivateKey; 035import java.security.PublicKey; 036import java.security.cert.Certificate; 037import java.util.Collection; 038import java.util.Date; 039import java.util.HashSet; 040import java.util.Set; 041import java.util.regex.Pattern; 042 043import javax.servlet.http.HttpServletRequest; 044 045import org.apache.commons.codec.digest.DigestUtils; 046import org.apache.commons.lang.StringUtils; 047import org.apache.commons.logging.Log; 048import org.apache.commons.logging.LogFactory; 049import org.nuxeo.common.Environment; 050import org.nuxeo.ecm.blob.AbstractBinaryGarbageCollector; 051import org.nuxeo.ecm.blob.AbstractCloudBinaryManager; 052import org.nuxeo.ecm.core.blob.ManagedBlob; 053import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 054import org.nuxeo.ecm.core.blob.binary.FileStorage; 055import org.nuxeo.runtime.api.Framework; 056 057import com.amazonaws.AmazonClientException; 058import com.amazonaws.AmazonServiceException; 059import com.amazonaws.ClientConfiguration; 060import com.amazonaws.HttpMethod; 061import com.amazonaws.auth.AWSCredentialsProvider; 062import com.amazonaws.auth.InstanceProfileCredentialsProvider; 063import com.amazonaws.client.builder.AwsClientBuilder; 064import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; 065import com.amazonaws.services.s3.AmazonS3; 066import com.amazonaws.services.s3.AmazonS3ClientBuilder; 067import com.amazonaws.services.s3.AmazonS3EncryptionClientBuilder; 068import com.amazonaws.services.s3.model.AmazonS3Exception; 069import com.amazonaws.services.s3.model.CannedAccessControlList; 070import com.amazonaws.services.s3.model.CryptoConfiguration; 071import com.amazonaws.services.s3.model.EncryptedPutObjectRequest; 072import com.amazonaws.services.s3.model.EncryptionMaterials; 073import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest; 074import com.amazonaws.services.s3.model.GetObjectRequest; 075import com.amazonaws.services.s3.model.ObjectListing; 076import com.amazonaws.services.s3.model.ObjectMetadata; 077import com.amazonaws.services.s3.model.PutObjectRequest; 078import com.amazonaws.services.s3.model.S3ObjectSummary; 079import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider; 080import com.amazonaws.services.s3.transfer.Download; 081import com.amazonaws.services.s3.transfer.TransferManager; 082import com.amazonaws.services.s3.transfer.TransferManagerBuilder; 083import com.amazonaws.services.s3.transfer.Upload; 084import com.google.common.base.MoreObjects; 085 086/** 087 * A Binary Manager that stores binaries as S3 BLOBs 088 * <p> 089 * The BLOBs are cached locally on first access for efficiency. 090 * <p> 091 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file 092 * if accessed before the stream. 093 */ 094public class S3BinaryManager extends AbstractCloudBinaryManager { 095 096 private static final String MD5 = "MD5"; // must be MD5 for Etag 097 098 @Override 099 protected String getDefaultDigestAlgorithm() { 100 return MD5; 101 } 102 103 private static final Log log = LogFactory.getLog(S3BinaryManager.class); 104 105 public static final String SYSTEM_PROPERTY_PREFIX = "nuxeo.s3storage"; 106 107 public static final String BUCKET_NAME_PROPERTY = "bucket"; 108 109 public static final String BUCKET_PREFIX_PROPERTY = "bucket_prefix"; 110 111 public static final String BUCKET_REGION_PROPERTY = "region"; 112 113 public static final String DEFAULT_BUCKET_REGION = "us-east-1"; // US East 114 115 public static final String AWS_ID_PROPERTY = "awsid"; 116 117 public static final String AWS_ID_ENV = "AWS_ACCESS_KEY_ID"; 118 119 public static final String AWS_SECRET_PROPERTY = "awssecret"; 120 121 public static final String AWS_SECRET_ENV = "AWS_SECRET_ACCESS_KEY"; 122 123 /** AWS ClientConfiguration default 50 */ 124 public static final String CONNECTION_MAX_PROPERTY = "connection.max"; 125 126 /** AWS ClientConfiguration default 3 (with exponential backoff) */ 127 public static final String CONNECTION_RETRY_PROPERTY = "connection.retry"; 128 129 /** AWS ClientConfiguration default 50*1000 = 50s */ 130 public static final String CONNECTION_TIMEOUT_PROPERTY = "connection.timeout"; 131 132 /** AWS ClientConfiguration default 50*1000 = 50s */ 133 public static final String SOCKET_TIMEOUT_PROPERTY = "socket.timeout"; 134 135 public static final String KEYSTORE_FILE_PROPERTY = "crypt.keystore.file"; 136 137 public static final String KEYSTORE_PASS_PROPERTY = "crypt.keystore.password"; 138 139 public static final String SERVERSIDE_ENCRYPTION_PROPERTY = "crypt.serverside"; 140 141 public static final String PRIVKEY_ALIAS_PROPERTY = "crypt.key.alias"; 142 143 public static final String PRIVKEY_PASS_PROPERTY = "crypt.key.password"; 144 145 public static final String ENDPOINT_PROPERTY = "endpoint"; 146 147 public static final String DIRECTDOWNLOAD_PROPERTY_COMPAT = "downloadfroms3"; 148 149 public static final String DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT = "downloadfroms3.expire"; 150 151 private static final Pattern MD5_RE = Pattern.compile("[0-9a-f]{32}"); 152 153 protected String bucketName; 154 155 protected String bucketNamePrefix; 156 157 protected AWSCredentialsProvider awsCredentialsProvider; 158 159 protected ClientConfiguration clientConfiguration; 160 161 protected EncryptionMaterials encryptionMaterials; 162 163 protected boolean isEncrypted; 164 165 protected CryptoConfiguration cryptoConfiguration; 166 167 protected boolean userServerSideEncryption; 168 169 protected AmazonS3 amazonS3; 170 171 protected TransferManager transferManager; 172 173 @Override 174 public void close() { 175 // this also shuts down the AmazonS3Client 176 transferManager.shutdownNow(); 177 super.close(); 178 } 179 180 /** 181 * Aborts uploads that crashed and are older than 1 day. 182 * 183 * @since 7.2 184 */ 185 protected void abortOldUploads() throws IOException { 186 int oneDay = 1000 * 60 * 60 * 24; 187 try { 188 transferManager.abortMultipartUploads(bucketName, new Date(System.currentTimeMillis() - oneDay)); 189 } catch (AmazonS3Exception e) { 190 if (e.getStatusCode() == 400 || e.getStatusCode() == 404) { 191 log.error("Your cloud provider does not support aborting old uploads"); 192 return; 193 } 194 throw new IOException("Failed to abort old uploads", e); 195 } 196 } 197 198 @Override 199 protected void setupCloudClient() throws IOException { 200 // Get settings from the configuration 201 bucketName = getProperty(BUCKET_NAME_PROPERTY); 202 bucketNamePrefix = MoreObjects.firstNonNull(getProperty(BUCKET_PREFIX_PROPERTY), StringUtils.EMPTY); 203 String bucketRegion = getProperty(BUCKET_REGION_PROPERTY); 204 if (isBlank(bucketRegion)) { 205 bucketRegion = DEFAULT_BUCKET_REGION; 206 } 207 String awsID = getProperty(AWS_ID_PROPERTY); 208 String awsSecret = getProperty(AWS_SECRET_PROPERTY); 209 210 String proxyHost = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_HOST); 211 String proxyPort = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PORT); 212 String proxyLogin = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_LOGIN); 213 String proxyPassword = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PASSWORD); 214 215 int maxConnections = getIntProperty(CONNECTION_MAX_PROPERTY); 216 int maxErrorRetry = getIntProperty(CONNECTION_RETRY_PROPERTY); 217 int connectionTimeout = getIntProperty(CONNECTION_TIMEOUT_PROPERTY); 218 int socketTimeout = getIntProperty(SOCKET_TIMEOUT_PROPERTY); 219 220 String keystoreFile = getProperty(KEYSTORE_FILE_PROPERTY); 221 String keystorePass = getProperty(KEYSTORE_PASS_PROPERTY); 222 String privkeyAlias = getProperty(PRIVKEY_ALIAS_PROPERTY); 223 String privkeyPass = getProperty(PRIVKEY_PASS_PROPERTY); 224 String endpoint = getProperty(ENDPOINT_PROPERTY); 225 String sseprop = getProperty(SERVERSIDE_ENCRYPTION_PROPERTY); 226 if (isNotBlank(sseprop)) { 227 userServerSideEncryption = Boolean.parseBoolean(sseprop); 228 } 229 230 // Fallback on default env keys for ID and secret 231 if (isBlank(awsID)) { 232 awsID = System.getenv(AWS_ID_ENV); 233 } 234 if (isBlank(awsSecret)) { 235 awsSecret = System.getenv(AWS_SECRET_ENV); 236 } 237 238 if (isBlank(bucketName)) { 239 throw new RuntimeException("Missing conf: " + BUCKET_NAME_PROPERTY); 240 } 241 242 if (!isBlank(bucketNamePrefix) && !bucketNamePrefix.endsWith("/")) { 243 log.warn(String.format("%s %s S3 bucket prefix should end by '/' " + ": added automatically.", 244 BUCKET_PREFIX_PROPERTY, bucketNamePrefix)); 245 bucketNamePrefix += "/"; 246 } 247 // set up credentials 248 if (isBlank(awsID) || isBlank(awsSecret)) { 249 awsCredentialsProvider = InstanceProfileCredentialsProvider.getInstance(); 250 try { 251 awsCredentialsProvider.getCredentials(); 252 } catch (AmazonClientException e) { 253 throw new RuntimeException("Missing AWS credentials and no instance role found", e); 254 } 255 } else { 256 awsCredentialsProvider = new BasicAWSCredentialsProvider(awsID, awsSecret); 257 } 258 259 // set up client configuration 260 clientConfiguration = new ClientConfiguration(); 261 if (isNotBlank(proxyHost)) { 262 clientConfiguration.setProxyHost(proxyHost); 263 } 264 if (isNotBlank(proxyPort)) { 265 clientConfiguration.setProxyPort(Integer.parseInt(proxyPort)); 266 } 267 if (isNotBlank(proxyLogin)) { 268 clientConfiguration.setProxyUsername(proxyLogin); 269 } 270 if (proxyPassword != null) { // could be blank 271 clientConfiguration.setProxyPassword(proxyPassword); 272 } 273 if (maxConnections > 0) { 274 clientConfiguration.setMaxConnections(maxConnections); 275 } 276 if (maxErrorRetry >= 0) { // 0 is allowed 277 clientConfiguration.setMaxErrorRetry(maxErrorRetry); 278 } 279 if (connectionTimeout >= 0) { // 0 is allowed 280 clientConfiguration.setConnectionTimeout(connectionTimeout); 281 } 282 if (socketTimeout >= 0) { // 0 is allowed 283 clientConfiguration.setSocketTimeout(socketTimeout); 284 } 285 286 // set up encryption 287 encryptionMaterials = null; 288 if (isNotBlank(keystoreFile)) { 289 boolean confok = true; 290 if (keystorePass == null) { // could be blank 291 log.error("Keystore password missing"); 292 confok = false; 293 } 294 if (isBlank(privkeyAlias)) { 295 log.error("Key alias missing"); 296 confok = false; 297 } 298 if (privkeyPass == null) { // could be blank 299 log.error("Key password missing"); 300 confok = false; 301 } 302 if (!confok) { 303 throw new RuntimeException("S3 Crypto configuration incomplete"); 304 } 305 try { 306 // Open keystore 307 File ksFile = new File(keystoreFile); 308 FileInputStream ksStream = new FileInputStream(ksFile); 309 KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType()); 310 keystore.load(ksStream, keystorePass.toCharArray()); 311 ksStream.close(); 312 // Get keypair for alias 313 if (!keystore.isKeyEntry(privkeyAlias)) { 314 throw new RuntimeException("Alias " + privkeyAlias + " is missing or not a key alias"); 315 } 316 PrivateKey privKey = (PrivateKey) keystore.getKey(privkeyAlias, privkeyPass.toCharArray()); 317 Certificate cert = keystore.getCertificate(privkeyAlias); 318 PublicKey pubKey = cert.getPublicKey(); 319 KeyPair keypair = new KeyPair(pubKey, privKey); 320 // Get encryptionMaterials from keypair 321 encryptionMaterials = new EncryptionMaterials(keypair); 322 cryptoConfiguration = new CryptoConfiguration(); 323 } catch (IOException | GeneralSecurityException e) { 324 throw new RuntimeException("Could not read keystore: " + keystoreFile + ", alias: " + privkeyAlias, e); 325 } 326 } 327 isEncrypted = encryptionMaterials != null; 328 329 @SuppressWarnings("rawtypes") 330 AwsClientBuilder s3Builder; 331 // Try to create bucket if it doesn't exist 332 if (!isEncrypted) { 333 s3Builder = AmazonS3ClientBuilder.standard() 334 .withCredentials(awsCredentialsProvider) 335 .withClientConfiguration(clientConfiguration); 336 337 } else { 338 s3Builder = AmazonS3EncryptionClientBuilder.standard() 339 .withClientConfiguration(clientConfiguration) 340 .withCryptoConfiguration(cryptoConfiguration) 341 .withCredentials(awsCredentialsProvider) 342 .withEncryptionMaterials(new StaticEncryptionMaterialsProvider(encryptionMaterials)); 343 } 344 if (isNotBlank(endpoint)) { 345 s3Builder = s3Builder.withEndpointConfiguration(new EndpointConfiguration(endpoint, bucketRegion)); 346 } else { 347 s3Builder = s3Builder.withRegion(bucketRegion); 348 } 349 350 amazonS3 = (AmazonS3) s3Builder.build(); 351 352 try { 353 if (!amazonS3.doesBucketExist(bucketName)) { 354 amazonS3.createBucket(bucketName); 355 amazonS3.setBucketAcl(bucketName, CannedAccessControlList.Private); 356 } 357 } catch (AmazonClientException e) { 358 throw new IOException(e); 359 } 360 361 // compat for NXP-17895, using "downloadfroms3", to be removed 362 // these two fields have already been initialized by the base class initialize() 363 // using standard property "directdownload" 364 String dd = getProperty(DIRECTDOWNLOAD_PROPERTY_COMPAT); 365 if (dd != null) { 366 directDownload = Boolean.parseBoolean(dd); 367 } 368 int dde = getIntProperty(DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT); 369 if (dde >= 0) { 370 directDownloadExpire = dde; 371 } 372 373 transferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build(); 374 abortOldUploads(); 375 } 376 377 protected void removeBinary(String digest) { 378 amazonS3.deleteObject(bucketName, bucketNamePrefix + digest); 379 } 380 381 @Override 382 protected String getSystemPropertyPrefix() { 383 return SYSTEM_PROPERTY_PREFIX; 384 } 385 386 @Override 387 protected BinaryGarbageCollector instantiateGarbageCollector() { 388 return new S3BinaryGarbageCollector(this); 389 } 390 391 @Override 392 public void removeBinaries(Collection<String> digests) { 393 digests.forEach(this::removeBinary); 394 } 395 396 protected static boolean isMissingKey(AmazonClientException e) { 397 if (e instanceof AmazonServiceException) { 398 AmazonServiceException ase = (AmazonServiceException) e; 399 return (ase.getStatusCode() == 404) || "NoSuchKey".equals(ase.getErrorCode()) 400 || "Not Found".equals(e.getMessage()); 401 } 402 return false; 403 } 404 405 public static boolean isMD5(String digest) { 406 return MD5_RE.matcher(digest).matches(); 407 } 408 409 /** 410 * Used in the healthCheck; the transferManager should be initialized and the bucket accessible 411 * 412 * @since 9.3 413 */ 414 public boolean canAccessBucket() { 415 return transferManager != null && transferManager.getAmazonS3Client().doesBucketExist(bucketName); 416 } 417 418 @Override 419 protected FileStorage getFileStorage() { 420 return new S3FileStorage(); 421 } 422 423 public class S3FileStorage implements FileStorage { 424 425 @Override 426 public void storeFile(String digest, File file) throws IOException { 427 long t0 = 0; 428 if (log.isDebugEnabled()) { 429 t0 = System.currentTimeMillis(); 430 log.debug("storing blob " + digest + " to S3"); 431 } 432 String key = bucketNamePrefix + digest; 433 try { 434 amazonS3.getObjectMetadata(bucketName, key); 435 if (log.isDebugEnabled()) { 436 log.debug("blob " + digest + " is already in S3"); 437 } 438 } catch (AmazonClientException e) { 439 if (!isMissingKey(e)) { 440 throw new IOException(e); 441 } 442 // not already present -> store the blob 443 PutObjectRequest request; 444 if (!isEncrypted) { 445 request = new PutObjectRequest(bucketName, key, file); 446 if (userServerSideEncryption) { 447 ObjectMetadata objectMetadata = new ObjectMetadata(); 448 objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); 449 request.setMetadata(objectMetadata); 450 } 451 } else { 452 request = new EncryptedPutObjectRequest(bucketName, key, file); 453 } 454 Upload upload = transferManager.upload(request); 455 try { 456 upload.waitForUploadResult(); 457 } catch (AmazonClientException ee) { 458 throw new IOException(ee); 459 } catch (InterruptedException ee) { 460 // reset interrupted status 461 Thread.currentThread().interrupt(); 462 // continue interrupt 463 throw new RuntimeException(ee); 464 } finally { 465 if (log.isDebugEnabled()) { 466 long dtms = System.currentTimeMillis() - t0; 467 log.debug("stored blob " + digest + " to S3 in " + dtms + "ms"); 468 } 469 } 470 } 471 } 472 473 @Override 474 public boolean fetchFile(String digest, File file) throws IOException { 475 long t0 = 0; 476 if (log.isDebugEnabled()) { 477 t0 = System.currentTimeMillis(); 478 log.debug("fetching blob " + digest + " from S3"); 479 } 480 try { 481 Download download = transferManager.download(new GetObjectRequest(bucketName, bucketNamePrefix + digest), file); 482 download.waitForCompletion(); 483 // Check ETag it is by default MD5 if not multipart 484 if (!isEncrypted && !digest.equals(download.getObjectMetadata().getETag())) { 485 // In case of multipart it will happen, verify the downloaded file 486 String currentDigest; 487 try (FileInputStream input = new FileInputStream(file)) { 488 currentDigest = DigestUtils.md5Hex(input); 489 } 490 if (!currentDigest.equals(digest)) { 491 log.error("Invalid ETag in S3, currentDigest=" + currentDigest + " expectedDigest=" + digest); 492 throw new IOException("Invalid S3 object, it is corrupted expected digest is " + digest + " got " + currentDigest); 493 } 494 } 495 return true; 496 } catch (AmazonClientException e) { 497 if (!isMissingKey(e)) { 498 throw new IOException(e); 499 } 500 return false; 501 } catch (InterruptedException e) { 502 // reset interrupted status 503 Thread.currentThread().interrupt(); 504 // continue interrupt 505 throw new RuntimeException(e); 506 } finally { 507 if (log.isDebugEnabled()) { 508 long dtms = System.currentTimeMillis() - t0; 509 log.debug("fetched blob " + digest + " from S3 in " + dtms + "ms"); 510 } 511 } 512 513 } 514 } 515 516 /** 517 * Garbage collector for S3 binaries that stores the marked (in use) binaries in memory. 518 */ 519 public static class S3BinaryGarbageCollector extends AbstractBinaryGarbageCollector<S3BinaryManager> { 520 521 protected S3BinaryGarbageCollector(S3BinaryManager binaryManager) { 522 super(binaryManager); 523 } 524 525 @Override 526 public String getId() { 527 return "s3:" + binaryManager.bucketName; 528 } 529 530 @Override 531 public Set<String> getUnmarkedBlobs() { 532 // list S3 objects in the bucket 533 // record those not marked 534 Set<String> unmarked = new HashSet<>(); 535 ObjectListing list = null; 536 do { 537 if (list == null) { 538 list = binaryManager.amazonS3.listObjects(binaryManager.bucketName, binaryManager.bucketNamePrefix); 539 } else { 540 list = binaryManager.amazonS3.listNextBatchOfObjects(list); 541 } 542 int prefixLength = binaryManager.bucketNamePrefix.length(); 543 for (S3ObjectSummary summary : list.getObjectSummaries()) { 544 String digest = summary.getKey().substring(prefixLength); 545 if (!isMD5(digest)) { 546 // ignore files that cannot be MD5 digests for 547 // safety 548 continue; 549 } 550 long length = summary.getSize(); 551 if (marked.contains(digest)) { 552 status.numBinaries++; 553 status.sizeBinaries += length; 554 } else { 555 status.numBinariesGC++; 556 status.sizeBinariesGC += length; 557 // record file to delete 558 unmarked.add(digest); 559 marked.remove(digest); // optimize memory 560 } 561 } 562 } while (list.isTruncated()); 563 564 return unmarked; 565 } 566 } 567 568 // ******************** BlobProvider ******************** 569 570 @Override 571 protected URI getRemoteUri(String digest, ManagedBlob blob, HttpServletRequest servletRequest) throws IOException { 572 String key = bucketNamePrefix + digest; 573 Date expiration = new Date(); 574 expiration.setTime(expiration.getTime() + directDownloadExpire * 1000); 575 GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucketName, key, HttpMethod.GET); 576 request.addRequestParameter("response-content-type", getContentTypeHeader(blob)); 577 request.addRequestParameter("response-content-disposition", getContentDispositionHeader(blob, null)); 578 request.setExpiration(expiration); 579 URL url = amazonS3.generatePresignedUrl(request); 580 try { 581 return url.toURI(); 582 } catch (URISyntaxException e) { 583 throw new IOException(e); 584 } 585 } 586 587}