001/* 002 * (C) Copyright 2011-2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Mathieu Guillaume 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.core.storage.sql; 021 022import static org.apache.commons.lang.StringUtils.isBlank; 023import static org.apache.commons.lang.StringUtils.isNotBlank; 024 025import java.io.File; 026import java.io.FileInputStream; 027import java.io.IOException; 028import java.net.URI; 029import java.net.URISyntaxException; 030import java.net.URL; 031import java.security.GeneralSecurityException; 032import java.security.KeyPair; 033import java.security.KeyStore; 034import java.security.PrivateKey; 035import java.security.PublicKey; 036import java.security.cert.Certificate; 037import java.util.Collection; 038import java.util.Date; 039import java.util.HashSet; 040import java.util.Set; 041import java.util.regex.Pattern; 042 043import org.apache.commons.codec.digest.DigestUtils; 044 045import javax.servlet.http.HttpServletRequest; 046 047import org.apache.commons.lang.StringUtils; 048import org.apache.commons.logging.Log; 049import org.apache.commons.logging.LogFactory; 050import org.nuxeo.common.Environment; 051import org.nuxeo.ecm.blob.AbstractBinaryGarbageCollector; 052import org.nuxeo.ecm.blob.AbstractCloudBinaryManager; 053import org.nuxeo.ecm.core.blob.ManagedBlob; 054import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 055import org.nuxeo.ecm.core.blob.binary.FileStorage; 056import org.nuxeo.runtime.api.Framework; 057 058import com.amazonaws.AmazonClientException; 059import com.amazonaws.AmazonServiceException; 060import com.amazonaws.ClientConfiguration; 061import com.amazonaws.HttpMethod; 062import com.amazonaws.auth.AWSCredentialsProvider; 063import com.amazonaws.auth.InstanceProfileCredentialsProvider; 064import com.amazonaws.services.s3.AmazonS3; 065import com.amazonaws.services.s3.AmazonS3ClientBuilder; 066import com.amazonaws.services.s3.AmazonS3EncryptionClientBuilder; 067import com.amazonaws.services.s3.model.CannedAccessControlList; 068import com.amazonaws.services.s3.model.CryptoConfiguration; 069import com.amazonaws.services.s3.model.EncryptedPutObjectRequest; 070import com.amazonaws.services.s3.model.EncryptionMaterials; 071import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest; 072import com.amazonaws.services.s3.model.GetObjectRequest; 073import com.amazonaws.services.s3.model.ObjectListing; 074import com.amazonaws.services.s3.model.ObjectMetadata; 075import com.amazonaws.services.s3.model.PutObjectRequest; 076import com.amazonaws.services.s3.model.S3ObjectSummary; 077import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider; 078import com.amazonaws.services.s3.transfer.Download; 079import com.amazonaws.services.s3.transfer.TransferManager; 080import com.amazonaws.services.s3.transfer.TransferManagerBuilder; 081import com.amazonaws.services.s3.transfer.Upload; 082import com.google.common.base.MoreObjects; 083 084/** 085 * A Binary Manager that stores binaries as S3 BLOBs 086 * <p> 087 * The BLOBs are cached locally on first access for efficiency. 088 * <p> 089 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file 090 * if accessed before the stream. 091 */ 092public class S3BinaryManager extends AbstractCloudBinaryManager { 093 094 private static final String MD5 = "MD5"; // must be MD5 for Etag 095 096 @Override 097 protected String getDefaultDigestAlgorithm() { 098 return MD5; 099 } 100 101 private static final Log log = LogFactory.getLog(S3BinaryManager.class); 102 103 public static final String SYSTEM_PROPERTY_PREFIX = "nuxeo.s3storage"; 104 105 public static final String BUCKET_NAME_PROPERTY = "bucket"; 106 107 public static final String BUCKET_PREFIX_PROPERTY = "bucket_prefix"; 108 109 public static final String BUCKET_REGION_PROPERTY = "region"; 110 111 public static final String DEFAULT_BUCKET_REGION = "us-east-1"; // US East 112 113 public static final String AWS_ID_PROPERTY = "awsid"; 114 115 public static final String AWS_ID_ENV = "AWS_ACCESS_KEY_ID"; 116 117 public static final String AWS_SECRET_PROPERTY = "awssecret"; 118 119 public static final String AWS_SECRET_ENV = "AWS_SECRET_ACCESS_KEY"; 120 121 /** AWS ClientConfiguration default 50 */ 122 public static final String CONNECTION_MAX_PROPERTY = "connection.max"; 123 124 /** AWS ClientConfiguration default 3 (with exponential backoff) */ 125 public static final String CONNECTION_RETRY_PROPERTY = "connection.retry"; 126 127 /** AWS ClientConfiguration default 50*1000 = 50s */ 128 public static final String CONNECTION_TIMEOUT_PROPERTY = "connection.timeout"; 129 130 /** AWS ClientConfiguration default 50*1000 = 50s */ 131 public static final String SOCKET_TIMEOUT_PROPERTY = "socket.timeout"; 132 133 public static final String KEYSTORE_FILE_PROPERTY = "crypt.keystore.file"; 134 135 public static final String KEYSTORE_PASS_PROPERTY = "crypt.keystore.password"; 136 137 public static final String SERVERSIDE_ENCRYPTION_PROPERTY = "crypt.serverside"; 138 139 public static final String PRIVKEY_ALIAS_PROPERTY = "crypt.key.alias"; 140 141 public static final String PRIVKEY_PASS_PROPERTY = "crypt.key.password"; 142 143 public static final String ENDPOINT_PROPERTY = "endpoint"; 144 145 public static final String DIRECTDOWNLOAD_PROPERTY_COMPAT = "downloadfroms3"; 146 147 public static final String DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT = "downloadfroms3.expire"; 148 149 private static final Pattern MD5_RE = Pattern.compile("[0-9a-f]{32}"); 150 151 protected String bucketName; 152 153 protected String bucketNamePrefix; 154 155 protected AWSCredentialsProvider awsCredentialsProvider; 156 157 protected ClientConfiguration clientConfiguration; 158 159 protected EncryptionMaterials encryptionMaterials; 160 161 protected boolean isEncrypted; 162 163 protected CryptoConfiguration cryptoConfiguration; 164 165 protected boolean userServerSideEncryption; 166 167 protected AmazonS3 amazonS3; 168 169 protected TransferManager transferManager; 170 171 @Override 172 public void close() { 173 // this also shuts down the AmazonS3Client 174 transferManager.shutdownNow(); 175 super.close(); 176 } 177 178 /** 179 * Aborts uploads that crashed and are older than 1 day. 180 * 181 * @since 7.2 182 */ 183 protected void abortOldUploads() throws IOException { 184 int oneDay = 1000 * 60 * 60 * 24; 185 try { 186 transferManager.abortMultipartUploads(bucketName, new Date(System.currentTimeMillis() - oneDay)); 187 } catch (AmazonClientException e) { 188 throw new IOException("Failed to abort old uploads", e); 189 } 190 } 191 192 @Override 193 protected void setupCloudClient() throws IOException { 194 // Get settings from the configuration 195 bucketName = getProperty(BUCKET_NAME_PROPERTY); 196 bucketNamePrefix = MoreObjects.firstNonNull(getProperty(BUCKET_PREFIX_PROPERTY), StringUtils.EMPTY); 197 String bucketRegion = getProperty(BUCKET_REGION_PROPERTY); 198 if (isBlank(bucketRegion)) { 199 bucketRegion = DEFAULT_BUCKET_REGION; 200 } 201 String awsID = getProperty(AWS_ID_PROPERTY); 202 String awsSecret = getProperty(AWS_SECRET_PROPERTY); 203 204 String proxyHost = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_HOST); 205 String proxyPort = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PORT); 206 String proxyLogin = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_LOGIN); 207 String proxyPassword = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PASSWORD); 208 209 int maxConnections = getIntProperty(CONNECTION_MAX_PROPERTY); 210 int maxErrorRetry = getIntProperty(CONNECTION_RETRY_PROPERTY); 211 int connectionTimeout = getIntProperty(CONNECTION_TIMEOUT_PROPERTY); 212 int socketTimeout = getIntProperty(SOCKET_TIMEOUT_PROPERTY); 213 214 String keystoreFile = getProperty(KEYSTORE_FILE_PROPERTY); 215 String keystorePass = getProperty(KEYSTORE_PASS_PROPERTY); 216 String privkeyAlias = getProperty(PRIVKEY_ALIAS_PROPERTY); 217 String privkeyPass = getProperty(PRIVKEY_PASS_PROPERTY); 218 String endpoint = getProperty(ENDPOINT_PROPERTY); 219 String sseprop = getProperty(SERVERSIDE_ENCRYPTION_PROPERTY); 220 if (isNotBlank(sseprop)) { 221 userServerSideEncryption = Boolean.parseBoolean(sseprop); 222 } 223 224 // Fallback on default env keys for ID and secret 225 if (isBlank(awsID)) { 226 awsID = System.getenv(AWS_ID_ENV); 227 } 228 if (isBlank(awsSecret)) { 229 awsSecret = System.getenv(AWS_SECRET_ENV); 230 } 231 232 if (isBlank(bucketName)) { 233 throw new RuntimeException("Missing conf: " + BUCKET_NAME_PROPERTY); 234 } 235 236 if (!isBlank(bucketNamePrefix) && !bucketNamePrefix.endsWith("/")) { 237 log.warn(String.format("%s %s S3 bucket prefix should end by '/' " + ": added automatically.", 238 BUCKET_PREFIX_PROPERTY, bucketNamePrefix)); 239 bucketNamePrefix += "/"; 240 } 241 // set up credentials 242 if (isBlank(awsID) || isBlank(awsSecret)) { 243 awsCredentialsProvider = InstanceProfileCredentialsProvider.getInstance(); 244 try { 245 awsCredentialsProvider.getCredentials(); 246 } catch (AmazonClientException e) { 247 throw new RuntimeException("Missing AWS credentials and no instance role found"); 248 } 249 } else { 250 awsCredentialsProvider = new BasicAWSCredentialsProvider(awsID, awsSecret); 251 } 252 253 // set up client configuration 254 clientConfiguration = new ClientConfiguration(); 255 if (isNotBlank(proxyHost)) { 256 clientConfiguration.setProxyHost(proxyHost); 257 } 258 if (isNotBlank(proxyPort)) { 259 clientConfiguration.setProxyPort(Integer.parseInt(proxyPort)); 260 } 261 if (isNotBlank(proxyLogin)) { 262 clientConfiguration.setProxyUsername(proxyLogin); 263 } 264 if (proxyPassword != null) { // could be blank 265 clientConfiguration.setProxyPassword(proxyPassword); 266 } 267 if (maxConnections > 0) { 268 clientConfiguration.setMaxConnections(maxConnections); 269 } 270 if (maxErrorRetry >= 0) { // 0 is allowed 271 clientConfiguration.setMaxErrorRetry(maxErrorRetry); 272 } 273 if (connectionTimeout >= 0) { // 0 is allowed 274 clientConfiguration.setConnectionTimeout(connectionTimeout); 275 } 276 if (socketTimeout >= 0) { // 0 is allowed 277 clientConfiguration.setSocketTimeout(socketTimeout); 278 } 279 280 // set up encryption 281 encryptionMaterials = null; 282 if (isNotBlank(keystoreFile)) { 283 boolean confok = true; 284 if (keystorePass == null) { // could be blank 285 log.error("Keystore password missing"); 286 confok = false; 287 } 288 if (isBlank(privkeyAlias)) { 289 log.error("Key alias missing"); 290 confok = false; 291 } 292 if (privkeyPass == null) { // could be blank 293 log.error("Key password missing"); 294 confok = false; 295 } 296 if (!confok) { 297 throw new RuntimeException("S3 Crypto configuration incomplete"); 298 } 299 try { 300 // Open keystore 301 File ksFile = new File(keystoreFile); 302 FileInputStream ksStream = new FileInputStream(ksFile); 303 KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType()); 304 keystore.load(ksStream, keystorePass.toCharArray()); 305 ksStream.close(); 306 // Get keypair for alias 307 if (!keystore.isKeyEntry(privkeyAlias)) { 308 throw new RuntimeException("Alias " + privkeyAlias + " is missing or not a key alias"); 309 } 310 PrivateKey privKey = (PrivateKey) keystore.getKey(privkeyAlias, privkeyPass.toCharArray()); 311 Certificate cert = keystore.getCertificate(privkeyAlias); 312 PublicKey pubKey = cert.getPublicKey(); 313 KeyPair keypair = new KeyPair(pubKey, privKey); 314 // Get encryptionMaterials from keypair 315 encryptionMaterials = new EncryptionMaterials(keypair); 316 cryptoConfiguration = new CryptoConfiguration(); 317 } catch (IOException | GeneralSecurityException e) { 318 throw new RuntimeException("Could not read keystore: " + keystoreFile + ", alias: " + privkeyAlias, e); 319 } 320 } 321 isEncrypted = encryptionMaterials != null; 322 323 // Try to create bucket if it doesn't exist 324 if (!isEncrypted) { 325 amazonS3 = AmazonS3ClientBuilder.standard() 326 .withCredentials(awsCredentialsProvider) 327 .withClientConfiguration(clientConfiguration) 328 .withRegion(bucketRegion) 329 .build(); 330 } else { 331 amazonS3 = AmazonS3EncryptionClientBuilder.standard() 332 .withClientConfiguration(clientConfiguration) 333 .withCryptoConfiguration(cryptoConfiguration) 334 .withCredentials(awsCredentialsProvider) 335 .withRegion(bucketRegion) 336 .withEncryptionMaterials(new StaticEncryptionMaterialsProvider(encryptionMaterials)) 337 .build(); 338 } 339 if (isNotBlank(endpoint)) { 340 amazonS3.setEndpoint(endpoint); 341 } 342 343 try { 344 if (!amazonS3.doesBucketExist(bucketName)) { 345 amazonS3.createBucket(bucketName); 346 amazonS3.setBucketAcl(bucketName, CannedAccessControlList.Private); 347 } 348 } catch (AmazonClientException e) { 349 throw new IOException(e); 350 } 351 352 // compat for NXP-17895, using "downloadfroms3", to be removed 353 // these two fields have already been initialized by the base class initialize() 354 // using standard property "directdownload" 355 String dd = getProperty(DIRECTDOWNLOAD_PROPERTY_COMPAT); 356 if (dd != null) { 357 directDownload = Boolean.parseBoolean(dd); 358 } 359 int dde = getIntProperty(DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT); 360 if (dde >= 0) { 361 directDownloadExpire = dde; 362 } 363 364 transferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build(); 365 abortOldUploads(); 366 } 367 368 protected void removeBinary(String digest) { 369 amazonS3.deleteObject(bucketName, bucketNamePrefix + digest); 370 } 371 372 @Override 373 protected String getSystemPropertyPrefix() { 374 return SYSTEM_PROPERTY_PREFIX; 375 } 376 377 @Override 378 protected BinaryGarbageCollector instantiateGarbageCollector() { 379 return new S3BinaryGarbageCollector(this); 380 } 381 382 @Override 383 public void removeBinaries(Collection<String> digests) { 384 digests.forEach(this::removeBinary); 385 } 386 387 protected static boolean isMissingKey(AmazonClientException e) { 388 if (e instanceof AmazonServiceException) { 389 AmazonServiceException ase = (AmazonServiceException) e; 390 return (ase.getStatusCode() == 404) || "NoSuchKey".equals(ase.getErrorCode()) 391 || "Not Found".equals(e.getMessage()); 392 } 393 return false; 394 } 395 396 public static boolean isMD5(String digest) { 397 return MD5_RE.matcher(digest).matches(); 398 } 399 400 @Override 401 protected FileStorage getFileStorage() { 402 return new S3FileStorage(); 403 } 404 405 public class S3FileStorage implements FileStorage { 406 407 @Override 408 public void storeFile(String digest, File file) throws IOException { 409 long t0 = 0; 410 if (log.isDebugEnabled()) { 411 t0 = System.currentTimeMillis(); 412 log.debug("storing blob " + digest + " to S3"); 413 } 414 String key = bucketNamePrefix + digest; 415 try { 416 amazonS3.getObjectMetadata(bucketName, key); 417 if (log.isDebugEnabled()) { 418 log.debug("blob " + digest + " is already in S3"); 419 } 420 } catch (AmazonClientException e) { 421 if (!isMissingKey(e)) { 422 throw new IOException(e); 423 } 424 // not already present -> store the blob 425 PutObjectRequest request; 426 if (!isEncrypted) { 427 request = new PutObjectRequest(bucketName, key, file); 428 if (userServerSideEncryption) { 429 ObjectMetadata objectMetadata = new ObjectMetadata(); 430 objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); 431 request.setMetadata(objectMetadata); 432 } 433 } else { 434 request = new EncryptedPutObjectRequest(bucketName, key, file); 435 } 436 Upload upload = transferManager.upload(request); 437 try { 438 upload.waitForUploadResult(); 439 } catch (AmazonClientException ee) { 440 throw new IOException(ee); 441 } catch (InterruptedException ee) { 442 // reset interrupted status 443 Thread.currentThread().interrupt(); 444 // continue interrupt 445 throw new RuntimeException(ee); 446 } finally { 447 if (log.isDebugEnabled()) { 448 long dtms = System.currentTimeMillis() - t0; 449 log.debug("stored blob " + digest + " to S3 in " + dtms + "ms"); 450 } 451 } 452 } 453 } 454 455 @Override 456 public boolean fetchFile(String digest, File file) throws IOException { 457 long t0 = 0; 458 if (log.isDebugEnabled()) { 459 t0 = System.currentTimeMillis(); 460 log.debug("fetching blob " + digest + " from S3"); 461 } 462 try { 463 Download download = transferManager.download(new GetObjectRequest(bucketName, bucketNamePrefix + digest), file); 464 download.waitForCompletion(); 465 // Check ETag it is by default MD5 if not multipart 466 if (!isEncrypted && !digest.equals(download.getObjectMetadata().getETag())) { 467 // In case of multipart it will happen, verify the downloaded file 468 String currentDigest; 469 try (FileInputStream input = new FileInputStream(file)) { 470 currentDigest = DigestUtils.md5Hex(input); 471 } 472 if (!currentDigest.equals(digest)) { 473 log.error("Invalid ETag in S3, currentDigest=" + currentDigest + " expectedDigest=" + digest); 474 throw new IOException("Invalid S3 object, it is corrupted expected digest is " + digest + " got " + currentDigest); 475 } 476 } 477 return true; 478 } catch (AmazonClientException e) { 479 if (!isMissingKey(e)) { 480 throw new IOException(e); 481 } 482 return false; 483 } catch (InterruptedException e) { 484 // reset interrupted status 485 Thread.currentThread().interrupt(); 486 // continue interrupt 487 throw new RuntimeException(e); 488 } finally { 489 if (log.isDebugEnabled()) { 490 long dtms = System.currentTimeMillis() - t0; 491 log.debug("fetched blob " + digest + " from S3 in " + dtms + "ms"); 492 } 493 } 494 495 } 496 } 497 498 /** 499 * Garbage collector for S3 binaries that stores the marked (in use) binaries in memory. 500 */ 501 public static class S3BinaryGarbageCollector extends AbstractBinaryGarbageCollector<S3BinaryManager> { 502 503 protected S3BinaryGarbageCollector(S3BinaryManager binaryManager) { 504 super(binaryManager); 505 } 506 507 @Override 508 public String getId() { 509 return "s3:" + binaryManager.bucketName; 510 } 511 512 @Override 513 public Set<String> getUnmarkedBlobs() { 514 // list S3 objects in the bucket 515 // record those not marked 516 Set<String> unmarked = new HashSet<>(); 517 ObjectListing list = null; 518 do { 519 if (list == null) { 520 list = binaryManager.amazonS3.listObjects(binaryManager.bucketName, binaryManager.bucketNamePrefix); 521 } else { 522 list = binaryManager.amazonS3.listNextBatchOfObjects(list); 523 } 524 int prefixLength = binaryManager.bucketNamePrefix.length(); 525 for (S3ObjectSummary summary : list.getObjectSummaries()) { 526 String digest = summary.getKey().substring(prefixLength); 527 if (!isMD5(digest)) { 528 // ignore files that cannot be MD5 digests for 529 // safety 530 continue; 531 } 532 long length = summary.getSize(); 533 if (marked.contains(digest)) { 534 status.numBinaries++; 535 status.sizeBinaries += length; 536 } else { 537 status.numBinariesGC++; 538 status.sizeBinariesGC += length; 539 // record file to delete 540 unmarked.add(digest); 541 marked.remove(digest); // optimize memory 542 } 543 } 544 } while (list.isTruncated()); 545 546 return unmarked; 547 } 548 } 549 550 // ******************** BlobProvider ******************** 551 552 @Override 553 protected URI getRemoteUri(String digest, ManagedBlob blob, HttpServletRequest servletRequest) throws IOException { 554 String key = bucketNamePrefix + digest; 555 Date expiration = new Date(); 556 expiration.setTime(expiration.getTime() + directDownloadExpire * 1000); 557 GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucketName, key, HttpMethod.GET); 558 request.addRequestParameter("response-content-type", getContentTypeHeader(blob)); 559 request.addRequestParameter("response-content-disposition", getContentDispositionHeader(blob, null)); 560 request.setExpiration(expiration); 561 URL url = amazonS3.generatePresignedUrl(request); 562 try { 563 return url.toURI(); 564 } catch (URISyntaxException e) { 565 throw new IOException(e); 566 } 567 } 568 569}