001/* 002 * (C) Copyright 2011-2020 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Luís Duarte 018 * Florent Guillaume 019 * Mickaël Schoentgen 020 */ 021package org.nuxeo.ecm.core.storage.sql; 022 023import static org.apache.commons.lang3.StringUtils.defaultIfBlank; 024import static org.apache.commons.lang3.StringUtils.defaultIfEmpty; 025import static org.apache.commons.lang3.StringUtils.defaultString; 026import static org.apache.commons.lang3.StringUtils.isBlank; 027import static org.apache.commons.lang3.StringUtils.isEmpty; 028import static org.apache.commons.lang3.StringUtils.isNotBlank; 029import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.ACCELERATE_MODE_PROPERTY; 030import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.AWS_ID_PROPERTY; 031import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.AWS_SECRET_PROPERTY; 032import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.AWS_SESSION_TOKEN_PROPERTY; 033import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.BUCKET_NAME_PROPERTY; 034import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.BUCKET_PREFIX_PROPERTY; 035import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.BUCKET_REGION_PROPERTY; 036import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.ENDPOINT_PROPERTY; 037import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.PATHSTYLEACCESS_PROPERTY; 038 039import java.io.IOException; 040import java.io.Serializable; 041import java.util.HashMap; 042import java.util.Map; 043import java.util.Objects; 044 045import org.apache.commons.lang3.StringUtils; 046import org.apache.commons.logging.Log; 047import org.apache.commons.logging.LogFactory; 048import org.nuxeo.ecm.automation.server.jaxrs.batch.Batch; 049import org.nuxeo.ecm.automation.server.jaxrs.batch.handler.AbstractBatchHandler; 050import org.nuxeo.ecm.automation.server.jaxrs.batch.handler.BatchFileInfo; 051import org.nuxeo.ecm.blob.s3.S3ManagedTransfer; 052import org.nuxeo.ecm.core.api.Blob; 053import org.nuxeo.ecm.core.api.NuxeoException; 054import org.nuxeo.ecm.core.blob.BlobInfo; 055import org.nuxeo.ecm.core.blob.BlobManager; 056import org.nuxeo.ecm.core.blob.BlobProvider; 057import org.nuxeo.ecm.core.blob.BlobStoreBlobProvider; 058import org.nuxeo.runtime.api.Framework; 059import org.nuxeo.runtime.aws.NuxeoAWSRegionProvider; 060 061import com.amazonaws.auth.AWSCredentialsProvider; 062import com.amazonaws.client.builder.AwsClientBuilder; 063import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; 064import com.amazonaws.services.s3.AmazonS3; 065import com.amazonaws.services.s3.AmazonS3ClientBuilder; 066import com.amazonaws.services.s3.model.AmazonS3Exception; 067import com.amazonaws.services.s3.model.CopyObjectRequest; 068import com.amazonaws.services.s3.model.ObjectMetadata; 069import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams; 070import com.amazonaws.services.s3.transfer.Copy; 071import com.amazonaws.services.s3.transfer.TransferManager; 072import com.amazonaws.services.securitytoken.AWSSecurityTokenService; 073import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; 074import com.amazonaws.services.securitytoken.model.AssumeRoleRequest; 075import com.amazonaws.services.securitytoken.model.Credentials; 076 077/** 078 * Batch Handler allowing direct S3 upload. 079 * 080 * @since 10.1 081 */ 082public class S3DirectBatchHandler extends AbstractBatchHandler { 083 084 private static final Log log = LogFactory.getLog(S3DirectBatchHandler.class); 085 086 // properties passed at initialization time from extension point 087 088 /** @deprecated since 11.1, use {@link S3BinaryManager#ACCELERATE_MODE_PROPERTY} */ 089 @Deprecated 090 public static final String ACCELERATE_MODE_ENABLED_PROPERTY = "accelerateMode"; 091 092 public static final String POLICY_TEMPLATE_PROPERTY = "policyTemplate"; 093 094 /** 095 * @since 10.10 096 */ 097 public static final String ROLE_ARN_PROPERTY = "roleArn"; 098 099 /** 100 * @since 11.1 101 */ 102 public static final String BLOB_PROVIDER_ID_PROPERTY = "blobProvider"; 103 104 // keys in the batch properties, returned to the client 105 106 public static final String INFO_AWS_SECRET_KEY_ID = "awsSecretKeyId"; 107 108 public static final String INFO_AWS_SECRET_ACCESS_KEY = "awsSecretAccessKey"; 109 110 public static final String INFO_AWS_SESSION_TOKEN = "awsSessionToken"; 111 112 public static final String INFO_BUCKET = "bucket"; 113 114 public static final String INFO_BASE_KEY = "baseKey"; 115 116 public static final String INFO_EXPIRATION = "expiration"; 117 118 /** @since 11.1 */ 119 public static final String INFO_AWS_ENDPOINT = "endpoint"; 120 121 /** @since 11.1 */ 122 public static final String INFO_AWS_PATH_STYLE_ACCESS = "usePathStyleAccess"; 123 124 public static final String INFO_AWS_REGION = "region"; 125 126 public static final String INFO_USE_S3_ACCELERATE = "useS3Accelerate"; 127 128 protected AWSSecurityTokenService stsClient; 129 130 protected AmazonS3 amazonS3; 131 132 protected String endpoint; 133 134 protected boolean pathStyleAccessEnabled; 135 136 protected String region; 137 138 protected String bucket; 139 140 protected String bucketPrefix; 141 142 protected boolean accelerateModeEnabled; 143 144 protected int expiration; 145 146 protected String policy; 147 148 protected String roleArn; 149 150 protected boolean useServerSideEncryption; 151 152 protected String serverSideKMSKeyID; 153 154 protected String blobProviderId; 155 156 @Override 157 protected void initialize(Map<String, String> properties) { 158 super.initialize(properties); 159 endpoint = properties.get(ENDPOINT_PROPERTY); 160 pathStyleAccessEnabled = Boolean.parseBoolean(properties.get(PATHSTYLEACCESS_PROPERTY)); 161 region = properties.get(BUCKET_REGION_PROPERTY); 162 if (isBlank(region)) { 163 region = NuxeoAWSRegionProvider.getInstance().getRegion(); 164 } 165 bucket = properties.get(BUCKET_NAME_PROPERTY); 166 if (isBlank(bucket)) { 167 throw new NuxeoException("Missing configuration property: " + BUCKET_NAME_PROPERTY); 168 } 169 roleArn = properties.get(ROLE_ARN_PROPERTY); 170 if (isBlank(roleArn)) { 171 throw new NuxeoException("Missing configuration property: " + ROLE_ARN_PROPERTY); 172 } 173 bucketPrefix = defaultString(properties.get(BUCKET_PREFIX_PROPERTY)); 174 accelerateModeEnabled = Boolean.parseBoolean(properties.get(ACCELERATE_MODE_PROPERTY)); 175 String awsSecretKeyId = properties.get(AWS_ID_PROPERTY); 176 String awsSecretAccessKey = properties.get(AWS_SECRET_PROPERTY); 177 String awsSessionToken = properties.get(AWS_SESSION_TOKEN_PROPERTY); 178 expiration = Integer.parseInt(defaultIfEmpty(properties.get(INFO_EXPIRATION), "0")); 179 policy = properties.get(POLICY_TEMPLATE_PROPERTY); 180 181 useServerSideEncryption = Boolean.parseBoolean(properties.get(S3BinaryManager.SERVERSIDE_ENCRYPTION_PROPERTY)); 182 serverSideKMSKeyID = properties.get(S3BinaryManager.SERVERSIDE_ENCRYPTION_KMS_KEY_PROPERTY); 183 184 AWSCredentialsProvider credentials = S3Utils.getAWSCredentialsProvider(awsSecretKeyId, awsSecretAccessKey, 185 awsSessionToken); 186 stsClient = initializeSTSClient(credentials); 187 amazonS3 = initializeS3Client(credentials); 188 189 if (!isBlank(bucketPrefix) && !bucketPrefix.endsWith("/")) { 190 log.debug(String.format("%s %s S3 bucket prefix should end with '/': added automatically.", 191 BUCKET_PREFIX_PROPERTY, bucketPrefix)); 192 bucketPrefix += "/"; 193 } 194 195 blobProviderId = defaultString(properties.get(BLOB_PROVIDER_ID_PROPERTY), transientStoreName); 196 } 197 198 protected AWSSecurityTokenService initializeSTSClient(AWSCredentialsProvider credentials) { 199 AWSSecurityTokenServiceClientBuilder builder = AWSSecurityTokenServiceClientBuilder.standard(); 200 initializeBuilder(builder, credentials); 201 return builder.build(); 202 } 203 204 protected AmazonS3 initializeS3Client(AWSCredentialsProvider credentials) { 205 AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard(); 206 initializeBuilder(builder, credentials); 207 builder.setPathStyleAccessEnabled(pathStyleAccessEnabled); 208 builder.setAccelerateModeEnabled(accelerateModeEnabled); 209 return builder.build(); 210 } 211 212 protected void initializeBuilder(AwsClientBuilder<?, ?> builder, AWSCredentialsProvider credentials) { 213 if (isBlank(endpoint)) { 214 builder.setRegion(region); 215 } else { 216 builder.setEndpointConfiguration(new EndpointConfiguration(endpoint, region)); 217 } 218 builder.setCredentials(credentials); 219 } 220 221 @Override 222 public Batch getBatch(String batchId) { 223 Map<String, Serializable> parameters = getBatchParameters(batchId); 224 if (parameters == null) { 225 return null; 226 } 227 228 // create the batch 229 Batch batch = new Batch(batchId, parameters, getName(), getTransientStore()); 230 231 Credentials credentials = getAwsCredentials(batchId); 232 233 Map<String, Object> properties = batch.getProperties(); 234 properties.put(INFO_AWS_SECRET_KEY_ID, credentials.getAccessKeyId()); 235 properties.put(INFO_AWS_SECRET_ACCESS_KEY, credentials.getSecretAccessKey()); 236 properties.put(INFO_AWS_SESSION_TOKEN, credentials.getSessionToken()); 237 properties.put(INFO_BUCKET, bucket); 238 properties.put(INFO_BASE_KEY, bucketPrefix); 239 properties.put(INFO_EXPIRATION, credentials.getExpiration().toInstant().toEpochMilli()); 240 properties.put(INFO_AWS_ENDPOINT, defaultIfBlank(endpoint, null)); 241 properties.put(INFO_AWS_PATH_STYLE_ACCESS, pathStyleAccessEnabled); 242 properties.put(INFO_AWS_REGION, region); 243 properties.put(INFO_USE_S3_ACCELERATE, accelerateModeEnabled); 244 245 return batch; 246 } 247 248 protected Credentials assumeRole(AssumeRoleRequest request) { 249 return stsClient.assumeRole(request).getCredentials(); 250 } 251 252 @Override 253 public boolean completeUpload(String batchId, String fileIndex, BatchFileInfo fileInfo) { 254 String fileKey = fileInfo.getKey(); 255 ObjectMetadata metadata = amazonS3.getObjectMetadata(bucket, fileKey); 256 257 String key; 258 String eTag = metadata.getETag(); 259 260 if (!useDeDuplication()) { 261 key = StringUtils.removeStart(fileKey, bucketPrefix); 262 } else { 263 key = eTag; 264 if (isEmpty(key)) { 265 return false; 266 } 267 String bucketKey = bucketPrefix + key; 268 CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucket, fileKey, bucket, bucketKey); 269 // server-side encryption 270 if (useServerSideEncryption) { 271 if (isNotBlank(serverSideKMSKeyID)) { 272 // SSE-KMS 273 SSEAwsKeyManagementParams params = new SSEAwsKeyManagementParams(serverSideKMSKeyID); 274 copyObjectRequest.setSSEAwsKeyManagementParams(params); 275 } else { 276 // SSE-S3 277 ObjectMetadata newObjectMetadata = new ObjectMetadata(); 278 newObjectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); 279 copyObjectRequest.setNewObjectMetadata(newObjectMetadata); 280 } 281 // TODO SSE-C 282 } 283 284 eTag = doMove(copyObjectRequest); 285 286 // if we did a multipart upload but can do a non multipart copy we can get back the digest as key 287 boolean isMultipartUpload = key.matches(".+-\\d+$"); 288 boolean canDoNonMultipartCopy = metadata.getContentLength() < getTransferManager().getConfiguration() 289 .getMultipartCopyThreshold(); 290 if (isMultipartUpload && canDoNonMultipartCopy) { 291 key = eTag; 292 String previousBucketKey = bucketKey; 293 bucketKey = bucketPrefix + key; 294 if (amazonS3.doesObjectExist(bucket, bucketKey)) { 295 // another thread has uploaded the same blob and has done the move 296 // clean up remaining S3 object if needed 297 deleteObjectAfterMove(previousBucketKey); 298 } else { 299 // move S3 object to eTag as key 300 var renameRequest = new CopyObjectRequest(bucket, previousBucketKey, bucket, bucketKey); 301 eTag = doMove(renameRequest); 302 } 303 } 304 } 305 306 BlobInfo blobInfo = new BlobInfo(); 307 blobInfo.mimeType = metadata.getContentType(); 308 blobInfo.encoding = metadata.getContentEncoding(); 309 blobInfo.filename = fileInfo.getFilename(); 310 blobInfo.length = metadata.getContentLength(); 311 blobInfo.key = key; 312 blobInfo.digest = defaultString(metadata.getContentMD5(), eTag); 313 314 Blob blob; 315 316 try { 317 blob = getBlobProvider().readBlob(blobInfo); 318 } catch (IOException e) { 319 throw new NuxeoException(e); 320 } 321 322 Batch batch = getBatch(batchId); 323 try { 324 batch.addFile(fileIndex, blob, blob.getFilename(), blob.getMimeType()); 325 } catch (NuxeoException e) { 326 try { 327 amazonS3.deleteObject(bucket, fileKey); 328 } catch (AmazonS3Exception s3E) { 329 e.addSuppressed(s3E); 330 } 331 throw e; 332 } 333 334 return true; 335 } 336 337 /** 338 * @return the ETag of copied object 339 */ 340 protected String doMove(CopyObjectRequest request) { 341 Copy rename = getTransferManager().copy(request); 342 try { 343 var copyResult = rename.waitForCopyResult(); 344 return copyResult.getETag(); 345 } catch (InterruptedException e) { 346 Thread.currentThread().interrupt(); 347 throw new NuxeoException(e); 348 } finally { 349 deleteObjectAfterMove(request.getSourceKey()); 350 } 351 } 352 353 protected void deleteObjectAfterMove(String key) { 354 try { 355 amazonS3.deleteObject(bucket, key); 356 } catch (AmazonS3Exception e) { 357 log.debug("Unable to cleanup object, move has already been done", e); 358 } 359 } 360 361 protected BlobProvider getBlobProvider() { 362 return Framework.getService(BlobManager.class).getBlobProvider(blobProviderId); 363 } 364 365 protected boolean useDeDuplication() { 366 BlobProvider blobProvider = getBlobProvider(); 367 if (blobProvider instanceof BlobStoreBlobProvider) { 368 return ((BlobStoreBlobProvider) blobProvider).getKeyStrategy().useDeDuplication(); 369 } 370 return true; 371 } 372 373 protected TransferManager getTransferManager() { 374 BlobProvider blobProvider = Framework.getService(BlobManager.class).getBlobProvider(blobProviderId); 375 if (!(blobProvider instanceof S3ManagedTransfer)) { 376 throw new NuxeoException("BlobProvider does not implement S3ManagedTransfer"); 377 } 378 return ((S3ManagedTransfer) blobProvider).getTransferManager(); 379 } 380 381 protected Credentials getAwsCredentials(String batchId) { 382 AssumeRoleRequest request = new AssumeRoleRequest().withRoleArn(roleArn) 383 .withPolicy(policy) 384 .withRoleSessionName(batchId); 385 if (expiration > 0) { 386 request.setDurationSeconds(expiration); 387 } 388 return assumeRole(request); 389 } 390 391 /** @since 11.1 */ 392 @Override 393 public Map<String, Object> refreshToken(String batchId) { 394 Objects.requireNonNull(batchId, "required batch ID"); 395 396 Credentials credentials = getAwsCredentials(batchId); 397 Map<String, Object> result = new HashMap<>(); 398 result.put(INFO_AWS_SECRET_KEY_ID, credentials.getAccessKeyId()); 399 result.put(INFO_AWS_SECRET_ACCESS_KEY, credentials.getSecretAccessKey()); 400 result.put(INFO_AWS_SESSION_TOKEN, credentials.getSessionToken()); 401 result.put(INFO_EXPIRATION, credentials.getExpiration().toInstant().toEpochMilli()); 402 return result; 403 } 404 405}