001/*
002 * (C) Copyright 2011-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Luís Duarte
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.storage.sql;
021
022import static org.apache.commons.lang3.StringUtils.defaultIfEmpty;
023import static org.apache.commons.lang3.StringUtils.defaultString;
024import static org.apache.commons.lang3.StringUtils.isBlank;
025import static org.apache.commons.lang3.StringUtils.isEmpty;
026import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.AWS_ID_PROPERTY;
027import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.AWS_SECRET_PROPERTY;
028import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.AWS_SESSION_TOKEN_PROPERTY;
029import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.BUCKET_NAME_PROPERTY;
030import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.BUCKET_PREFIX_PROPERTY;
031import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.BUCKET_REGION_PROPERTY;
032import static org.nuxeo.ecm.core.storage.sql.S3Utils.NON_MULTIPART_COPY_MAX_SIZE;
033
034import java.io.Serializable;
035import java.util.Map;
036import java.util.regex.Pattern;
037
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040import org.nuxeo.ecm.automation.server.jaxrs.batch.Batch;
041import org.nuxeo.ecm.automation.server.jaxrs.batch.handler.AbstractBatchHandler;
042import org.nuxeo.ecm.automation.server.jaxrs.batch.handler.BatchFileInfo;
043import org.nuxeo.ecm.core.api.Blob;
044import org.nuxeo.ecm.core.api.NuxeoException;
045import org.nuxeo.ecm.core.blob.binary.Binary;
046import org.nuxeo.ecm.core.blob.binary.BinaryBlob;
047import org.nuxeo.ecm.core.blob.binary.LazyBinary;
048import org.nuxeo.runtime.aws.NuxeoAWSRegionProvider;
049
050import com.amazonaws.auth.AWSCredentialsProvider;
051import com.amazonaws.services.s3.AmazonS3;
052import com.amazonaws.services.s3.AmazonS3ClientBuilder;
053import com.amazonaws.services.s3.model.ObjectMetadata;
054import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
055import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
056import com.amazonaws.services.securitytoken.model.AssumeRoleRequest;
057import com.amazonaws.services.securitytoken.model.Credentials;
058
059/**
060 * Batch Handler allowing direct S3 upload.
061 *
062 * @since 10.1
063 */
064public class S3DirectBatchHandler extends AbstractBatchHandler {
065
066    private static final Log log = LogFactory.getLog(S3BinaryManager.class);
067
068    protected static final Pattern REGEX_MULTIPART_ETAG = Pattern.compile("-\\d+$");
069
070    protected static final Pattern REGEX_BUCKET_PATH_PLACE_HOLDER = Pattern.compile("\\{\\{bucketPath}}");
071
072    // properties passed at initialization time from extension point
073
074    public static final String ACCELERATE_MODE_ENABLED_PROPERTY = "accelerateMode";
075
076    public static final String POLICY_TEMPLATE_PROPERTY = "policyTemplate";
077
078    /**
079     * @since 10.10
080     */
081    public static final String ROLE_ARN_PROPERTY = "roleArn";
082
083    // keys in the batch properties, returned to the client
084
085    public static final String INFO_AWS_SECRET_KEY_ID = "awsSecretKeyId";
086
087    public static final String INFO_AWS_SECRET_ACCESS_KEY = "awsSecretAccessKey";
088
089    public static final String INFO_AWS_SESSION_TOKEN = "awsSessionToken";
090
091    public static final String INFO_BUCKET = "bucket";
092
093    public static final String INFO_BASE_KEY = "baseKey";
094
095    public static final String INFO_EXPIRATION = "expiration";
096
097    public static final String INFO_AWS_REGION = "region";
098
099    public static final String INFO_USE_S3_ACCELERATE = "useS3Accelerate";
100
101    protected AWSSecurityTokenService stsClient;
102
103    protected AmazonS3 amazonS3;
104
105    protected String region;
106
107    protected String bucket;
108
109    protected String bucketPrefix;
110
111    protected boolean accelerateModeEnabled;
112
113    protected int expiration;
114
115    protected String policy;
116
117    protected String roleArn;
118
119    @Override
120    protected void initialize(Map<String, String> properties) {
121        super.initialize(properties);
122        region = properties.get(BUCKET_REGION_PROPERTY);
123        if (isBlank(region)) {
124            region = NuxeoAWSRegionProvider.getInstance().getRegion();
125        }
126        bucket = properties.get(BUCKET_NAME_PROPERTY);
127        if (isBlank(bucket)) {
128            throw new NuxeoException("Missing configuration property: " + BUCKET_NAME_PROPERTY);
129        }
130        roleArn = properties.get(ROLE_ARN_PROPERTY);
131        if (isBlank(roleArn)) {
132            throw new NuxeoException("Missing configuration property: " + ROLE_ARN_PROPERTY);
133        }
134        bucketPrefix = defaultString(properties.get(BUCKET_PREFIX_PROPERTY));
135        accelerateModeEnabled = Boolean.parseBoolean(properties.get(ACCELERATE_MODE_ENABLED_PROPERTY));
136        String awsSecretKeyId = properties.get(AWS_ID_PROPERTY);
137        String awsSecretAccessKey = properties.get(AWS_SECRET_PROPERTY);
138        String awsSessionToken = properties.get(AWS_SESSION_TOKEN_PROPERTY);
139        expiration = Integer.parseInt(defaultIfEmpty(properties.get(INFO_EXPIRATION), "0"));
140        policy = properties.get(POLICY_TEMPLATE_PROPERTY);
141
142        AWSCredentialsProvider credentials = S3Utils.getAWSCredentialsProvider(awsSecretKeyId, awsSecretAccessKey,
143                awsSessionToken);
144        stsClient = initializeSTSClient(credentials);
145        amazonS3 = initializeS3Client(credentials);
146
147        if (!isBlank(bucketPrefix) && !bucketPrefix.endsWith("/")) {
148            log.warn(String.format("%s %s S3 bucket prefix should end with '/': added automatically.",
149                    BUCKET_PREFIX_PROPERTY, bucketPrefix));
150            bucketPrefix += "/";
151        }
152    }
153
154    protected AWSSecurityTokenService initializeSTSClient(AWSCredentialsProvider credentials) {
155        return AWSSecurityTokenServiceClientBuilder.standard()
156                                                   .withRegion(region)
157                                                   .withCredentials(credentials)
158                                                   .build();
159    }
160
161    protected AmazonS3 initializeS3Client(AWSCredentialsProvider credentials) {
162        return AmazonS3ClientBuilder.standard()
163                                    .withRegion(region)
164                                    .withCredentials(credentials)
165                                    .withAccelerateModeEnabled(accelerateModeEnabled)
166                                    .build();
167    }
168
169    @Override
170    public Batch getBatch(String batchId) {
171        Map<String, Serializable> parameters = getBatchParameters(batchId);
172        if (parameters == null) {
173            return null;
174        }
175
176        // create the batch
177        Batch batch = new Batch(batchId, parameters, getName(), getTransientStore());
178
179        AssumeRoleRequest request = new AssumeRoleRequest().withRoleArn(roleArn)
180                                                           .withPolicy(policy)
181                                                           .withRoleSessionName(batchId);
182        if (expiration > 0) {
183            request.setDurationSeconds(expiration);
184        }
185
186        Credentials credentials = assumeRole(request);
187
188        Map<String, Object> properties = batch.getProperties();
189        properties.put(INFO_AWS_SECRET_KEY_ID, credentials.getAccessKeyId());
190        properties.put(INFO_AWS_SECRET_ACCESS_KEY, credentials.getSecretAccessKey());
191        properties.put(INFO_AWS_SESSION_TOKEN, credentials.getSessionToken());
192        properties.put(INFO_BUCKET, bucket);
193        properties.put(INFO_BASE_KEY, bucketPrefix);
194        properties.put(INFO_EXPIRATION, credentials.getExpiration().toInstant().toEpochMilli());
195        properties.put(INFO_AWS_REGION, region);
196        properties.put(INFO_USE_S3_ACCELERATE, accelerateModeEnabled);
197
198        return batch;
199    }
200
201    protected Credentials assumeRole(AssumeRoleRequest request) {
202        return stsClient.assumeRole(request).getCredentials();
203    }
204
205    @Override
206    public boolean completeUpload(String batchId, String fileIndex, BatchFileInfo fileInfo) {
207        String fileKey = fileInfo.getKey();
208        ObjectMetadata metadata = amazonS3.getObjectMetadata(bucket, fileKey);
209        String etag = metadata.getETag();
210        if (isEmpty(etag)) {
211            return false;
212        }
213        String newFileKey = bucketPrefix + etag;
214        String mimeType = metadata.getContentType();
215        String encoding = metadata.getContentEncoding();
216
217        ObjectMetadata newMetadata;
218        if (metadata.getContentLength() > lowerThresholdToUseMultipartCopy()) {
219            newMetadata = S3Utils.copyFileMultipart(amazonS3, metadata, bucket, fileKey, bucket, newFileKey, true);
220        } else {
221            newMetadata = S3Utils.copyFile(amazonS3, metadata, bucket, fileKey, bucket, newFileKey, true);
222            boolean isMultipartUpload = REGEX_MULTIPART_ETAG.matcher(etag).find();
223            if (isMultipartUpload) {
224                etag = newMetadata.getETag();
225                String previousFileKey = newFileKey;
226                newFileKey = bucketPrefix + etag;
227                newMetadata = S3Utils.copyFile(amazonS3, metadata, bucket, previousFileKey, bucket, newFileKey, true);
228            }
229        }
230
231        String filename = fileInfo.getFilename();
232        long length = newMetadata.getContentLength();
233        String digest = newMetadata.getContentMD5() != null ? newMetadata.getContentMD5() : etag;
234        String blobProviderId = transientStoreName; // TODO decouple this
235        Binary binary = new LazyBinary(digest, blobProviderId, null);
236        Blob blob = new BinaryBlob(binary, digest, filename, mimeType, encoding, digest, length);
237        Batch batch = getBatch(batchId);
238        batch.addFile(fileIndex, blob, filename, mimeType);
239
240        return true;
241    }
242
243    protected long lowerThresholdToUseMultipartCopy() {
244        return NON_MULTIPART_COPY_MAX_SIZE;
245    }
246
247}