001/* 002 * (C) Copyright 2011-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Luís Duarte 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.core.storage.sql; 021 022import static org.apache.commons.lang3.StringUtils.defaultIfEmpty; 023import static org.apache.commons.lang3.StringUtils.defaultString; 024import static org.apache.commons.lang3.StringUtils.isBlank; 025import static org.apache.commons.lang3.StringUtils.isEmpty; 026import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.AWS_ID_PROPERTY; 027import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.AWS_SECRET_PROPERTY; 028import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.AWS_SESSION_TOKEN_PROPERTY; 029import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.BUCKET_NAME_PROPERTY; 030import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.BUCKET_PREFIX_PROPERTY; 031import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.BUCKET_REGION_PROPERTY; 032import static org.nuxeo.ecm.core.storage.sql.S3Utils.NON_MULTIPART_COPY_MAX_SIZE; 033 034import java.io.Serializable; 035import java.util.Map; 036import java.util.regex.Pattern; 037 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040import org.nuxeo.ecm.automation.server.jaxrs.batch.Batch; 041import org.nuxeo.ecm.automation.server.jaxrs.batch.handler.AbstractBatchHandler; 042import org.nuxeo.ecm.automation.server.jaxrs.batch.handler.BatchFileInfo; 043import org.nuxeo.ecm.core.api.Blob; 044import org.nuxeo.ecm.core.api.NuxeoException; 045import org.nuxeo.ecm.core.blob.binary.Binary; 046import org.nuxeo.ecm.core.blob.binary.BinaryBlob; 047import org.nuxeo.ecm.core.blob.binary.LazyBinary; 048import org.nuxeo.runtime.aws.NuxeoAWSRegionProvider; 049 050import com.amazonaws.auth.AWSCredentialsProvider; 051import com.amazonaws.services.s3.AmazonS3; 052import com.amazonaws.services.s3.AmazonS3ClientBuilder; 053import com.amazonaws.services.s3.model.ObjectMetadata; 054import com.amazonaws.services.securitytoken.AWSSecurityTokenService; 055import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; 056import com.amazonaws.services.securitytoken.model.AssumeRoleRequest; 057import com.amazonaws.services.securitytoken.model.Credentials; 058 059/** 060 * Batch Handler allowing direct S3 upload. 061 * 062 * @since 10.1 063 */ 064public class S3DirectBatchHandler extends AbstractBatchHandler { 065 066 private static final Log log = LogFactory.getLog(S3BinaryManager.class); 067 068 protected static final Pattern REGEX_MULTIPART_ETAG = Pattern.compile("-\\d+$"); 069 070 protected static final Pattern REGEX_BUCKET_PATH_PLACE_HOLDER = Pattern.compile("\\{\\{bucketPath}}"); 071 072 // properties passed at initialization time from extension point 073 074 public static final String ACCELERATE_MODE_ENABLED_PROPERTY = "accelerateMode"; 075 076 public static final String POLICY_TEMPLATE_PROPERTY = "policyTemplate"; 077 078 /** 079 * @since 10.10 080 */ 081 public static final String ROLE_ARN_PROPERTY = "roleArn"; 082 083 // keys in the batch properties, returned to the client 084 085 public static final String INFO_AWS_SECRET_KEY_ID = "awsSecretKeyId"; 086 087 public static final String INFO_AWS_SECRET_ACCESS_KEY = "awsSecretAccessKey"; 088 089 public static final String INFO_AWS_SESSION_TOKEN = "awsSessionToken"; 090 091 public static final String INFO_BUCKET = "bucket"; 092 093 public static final String INFO_BASE_KEY = "baseKey"; 094 095 public static final String INFO_EXPIRATION = "expiration"; 096 097 public static final String INFO_AWS_REGION = "region"; 098 099 public static final String INFO_USE_S3_ACCELERATE = "useS3Accelerate"; 100 101 protected AWSSecurityTokenService stsClient; 102 103 protected AmazonS3 amazonS3; 104 105 protected String region; 106 107 protected String bucket; 108 109 protected String bucketPrefix; 110 111 protected boolean accelerateModeEnabled; 112 113 protected int expiration; 114 115 protected String policy; 116 117 protected String roleArn; 118 119 @Override 120 protected void initialize(Map<String, String> properties) { 121 super.initialize(properties); 122 region = properties.get(BUCKET_REGION_PROPERTY); 123 if (isBlank(region)) { 124 region = NuxeoAWSRegionProvider.getInstance().getRegion(); 125 } 126 bucket = properties.get(BUCKET_NAME_PROPERTY); 127 if (isBlank(bucket)) { 128 throw new NuxeoException("Missing configuration property: " + BUCKET_NAME_PROPERTY); 129 } 130 roleArn = properties.get(ROLE_ARN_PROPERTY); 131 if (isBlank(roleArn)) { 132 throw new NuxeoException("Missing configuration property: " + ROLE_ARN_PROPERTY); 133 } 134 bucketPrefix = defaultString(properties.get(BUCKET_PREFIX_PROPERTY)); 135 accelerateModeEnabled = Boolean.parseBoolean(properties.get(ACCELERATE_MODE_ENABLED_PROPERTY)); 136 String awsSecretKeyId = properties.get(AWS_ID_PROPERTY); 137 String awsSecretAccessKey = properties.get(AWS_SECRET_PROPERTY); 138 String awsSessionToken = properties.get(AWS_SESSION_TOKEN_PROPERTY); 139 expiration = Integer.parseInt(defaultIfEmpty(properties.get(INFO_EXPIRATION), "0")); 140 policy = properties.get(POLICY_TEMPLATE_PROPERTY); 141 142 AWSCredentialsProvider credentials = S3Utils.getAWSCredentialsProvider(awsSecretKeyId, awsSecretAccessKey, 143 awsSessionToken); 144 stsClient = initializeSTSClient(credentials); 145 amazonS3 = initializeS3Client(credentials); 146 147 if (!isBlank(bucketPrefix) && !bucketPrefix.endsWith("/")) { 148 log.warn(String.format("%s %s S3 bucket prefix should end with '/': added automatically.", 149 BUCKET_PREFIX_PROPERTY, bucketPrefix)); 150 bucketPrefix += "/"; 151 } 152 } 153 154 protected AWSSecurityTokenService initializeSTSClient(AWSCredentialsProvider credentials) { 155 return AWSSecurityTokenServiceClientBuilder.standard() 156 .withRegion(region) 157 .withCredentials(credentials) 158 .build(); 159 } 160 161 protected AmazonS3 initializeS3Client(AWSCredentialsProvider credentials) { 162 return AmazonS3ClientBuilder.standard() 163 .withRegion(region) 164 .withCredentials(credentials) 165 .withAccelerateModeEnabled(accelerateModeEnabled) 166 .build(); 167 } 168 169 @Override 170 public Batch getBatch(String batchId) { 171 Map<String, Serializable> parameters = getBatchParameters(batchId); 172 if (parameters == null) { 173 return null; 174 } 175 176 // create the batch 177 Batch batch = new Batch(batchId, parameters, getName(), getTransientStore()); 178 179 AssumeRoleRequest request = new AssumeRoleRequest().withRoleArn(roleArn) 180 .withPolicy(policy) 181 .withRoleSessionName(batchId); 182 if (expiration > 0) { 183 request.setDurationSeconds(expiration); 184 } 185 186 Credentials credentials = assumeRole(request); 187 188 Map<String, Object> properties = batch.getProperties(); 189 properties.put(INFO_AWS_SECRET_KEY_ID, credentials.getAccessKeyId()); 190 properties.put(INFO_AWS_SECRET_ACCESS_KEY, credentials.getSecretAccessKey()); 191 properties.put(INFO_AWS_SESSION_TOKEN, credentials.getSessionToken()); 192 properties.put(INFO_BUCKET, bucket); 193 properties.put(INFO_BASE_KEY, bucketPrefix); 194 properties.put(INFO_EXPIRATION, credentials.getExpiration().toInstant().toEpochMilli()); 195 properties.put(INFO_AWS_REGION, region); 196 properties.put(INFO_USE_S3_ACCELERATE, accelerateModeEnabled); 197 198 return batch; 199 } 200 201 protected Credentials assumeRole(AssumeRoleRequest request) { 202 return stsClient.assumeRole(request).getCredentials(); 203 } 204 205 @Override 206 public boolean completeUpload(String batchId, String fileIndex, BatchFileInfo fileInfo) { 207 String fileKey = fileInfo.getKey(); 208 ObjectMetadata metadata = amazonS3.getObjectMetadata(bucket, fileKey); 209 String etag = metadata.getETag(); 210 if (isEmpty(etag)) { 211 return false; 212 } 213 String newFileKey = bucketPrefix + etag; 214 String mimeType = metadata.getContentType(); 215 String encoding = metadata.getContentEncoding(); 216 217 ObjectMetadata newMetadata; 218 if (metadata.getContentLength() > lowerThresholdToUseMultipartCopy()) { 219 newMetadata = S3Utils.copyFileMultipart(amazonS3, metadata, bucket, fileKey, bucket, newFileKey, true); 220 } else { 221 newMetadata = S3Utils.copyFile(amazonS3, metadata, bucket, fileKey, bucket, newFileKey, true); 222 boolean isMultipartUpload = REGEX_MULTIPART_ETAG.matcher(etag).find(); 223 if (isMultipartUpload) { 224 etag = newMetadata.getETag(); 225 String previousFileKey = newFileKey; 226 newFileKey = bucketPrefix + etag; 227 newMetadata = S3Utils.copyFile(amazonS3, metadata, bucket, previousFileKey, bucket, newFileKey, true); 228 } 229 } 230 231 String filename = fileInfo.getFilename(); 232 long length = newMetadata.getContentLength(); 233 String digest = newMetadata.getContentMD5() != null ? newMetadata.getContentMD5() : etag; 234 String blobProviderId = transientStoreName; // TODO decouple this 235 Binary binary = new LazyBinary(digest, blobProviderId, null); 236 Blob blob = new BinaryBlob(binary, digest, filename, mimeType, encoding, digest, length); 237 Batch batch = getBatch(batchId); 238 batch.addFile(fileIndex, blob, filename, mimeType); 239 240 return true; 241 } 242 243 protected long lowerThresholdToUseMultipartCopy() { 244 return NON_MULTIPART_COPY_MAX_SIZE; 245 } 246 247}