001/*
002 * (C) Copyright 2011-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Luís Duarte
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.storage.sql;
021
022import static org.apache.commons.lang3.StringUtils.defaultIfEmpty;
023import static org.apache.commons.lang3.StringUtils.defaultString;
024import static org.apache.commons.lang3.StringUtils.isBlank;
025import static org.apache.commons.lang3.StringUtils.isEmpty;
026import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.AWS_ID_PROPERTY;
027import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.AWS_SECRET_PROPERTY;
028import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.BUCKET_NAME_PROPERTY;
029import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.BUCKET_PREFIX_PROPERTY;
030import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.BUCKET_REGION_PROPERTY;
031import static org.nuxeo.ecm.core.storage.sql.S3Utils.NON_MULTIPART_COPY_MAX_SIZE;
032
033import java.io.Serializable;
034import java.util.Arrays;
035import java.util.List;
036import java.util.Map;
037import java.util.regex.Pattern;
038
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.nuxeo.ecm.automation.server.jaxrs.batch.Batch;
042import org.nuxeo.ecm.automation.server.jaxrs.batch.handler.AbstractBatchHandler;
043import org.nuxeo.ecm.automation.server.jaxrs.batch.handler.BatchFileInfo;
044import org.nuxeo.ecm.core.api.Blob;
045import org.nuxeo.ecm.core.api.NuxeoException;
046import org.nuxeo.ecm.core.blob.binary.Binary;
047import org.nuxeo.ecm.core.blob.binary.BinaryBlob;
048import org.nuxeo.ecm.core.blob.binary.LazyBinary;
049
050import com.amazonaws.auth.AWSCredentialsProvider;
051import com.amazonaws.services.s3.AmazonS3;
052import com.amazonaws.services.s3.AmazonS3ClientBuilder;
053import com.amazonaws.services.s3.model.ObjectMetadata;
054import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
055import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
056import com.amazonaws.services.securitytoken.model.Credentials;
057import com.amazonaws.services.securitytoken.model.GetFederationTokenRequest;
058
059/**
060 * Batch Handler allowing direct S3 upload.
061 *
062 * @since 10.1
063 */
064public class S3DirectBatchHandler extends AbstractBatchHandler {
065
066    private static final Log log = LogFactory.getLog(S3BinaryManager.class);
067
068    protected static final Pattern REGEX_MULTIPART_ETAG = Pattern.compile("-\\d+$");
069
070    protected static final Pattern REGEX_BUCKET_PATH_PLACE_HOLDER = Pattern.compile("\\{\\{bucketPath}}");
071
072    // properties passed at initialization time from extension point
073
074    public static final String ACCELERATE_MODE_ENABLED_PROPERTY = "accelerateMode";
075
076    public static final String POLICY_TEMPLATE_PROPERTY = "policyTemplate";
077
078    protected static final List<String> MANDATORY_PROPERTIES = Arrays.asList( //
079            AWS_ID_PROPERTY, //
080            AWS_SECRET_PROPERTY, //
081            BUCKET_NAME_PROPERTY, //
082            BUCKET_REGION_PROPERTY);
083
084    // keys in the batch properties, returned to the client
085
086    public static final String INFO_AWS_SECRET_KEY_ID = "awsSecretKeyId";
087
088    public static final String INFO_AWS_SECRET_ACCESS_KEY = "awsSecretAccessKey";
089
090    public static final String INFO_AWS_SESSION_TOKEN = "awsSessionToken";
091
092    public static final String INFO_BUCKET = "bucket";
093
094    public static final String INFO_BASE_KEY = "baseKey";
095
096    public static final String INFO_EXPIRATION = "expiration";
097
098    public static final String INFO_AWS_REGION = "region";
099
100    public static final String INFO_USE_S3_ACCELERATE = "useS3Accelerate";
101
102    protected AWSSecurityTokenService stsClient;
103
104    protected AmazonS3 amazonS3;
105
106    protected String region;
107
108    protected String bucket;
109
110    protected String bucketPrefix;
111
112    protected boolean accelerateModeEnabled;
113
114    protected int expiration;
115
116    protected String policy;
117
118    @Override
119    protected void initialize(Map<String, String> properties) {
120        super.initialize(properties);
121        for (String property : MANDATORY_PROPERTIES) {
122            if (isEmpty(properties.get(property))) {
123                throw new NuxeoException("Missing configuration property: " + property);
124            }
125        }
126        region = properties.get(BUCKET_REGION_PROPERTY);
127        bucket = properties.get(BUCKET_NAME_PROPERTY);
128        bucketPrefix = defaultString(properties.get(BUCKET_PREFIX_PROPERTY));
129        accelerateModeEnabled = Boolean.parseBoolean(properties.get(ACCELERATE_MODE_ENABLED_PROPERTY));
130        String awsSecretKeyId = properties.get(AWS_ID_PROPERTY);
131        String awsSecretAccessKey = properties.get(AWS_SECRET_PROPERTY);
132        expiration = Integer.parseInt(defaultIfEmpty(properties.get(INFO_EXPIRATION), "0"));
133        policy = properties.get(POLICY_TEMPLATE_PROPERTY);
134
135        AWSCredentialsProvider credentials = S3Utils.getAWSCredentialsProvider(awsSecretKeyId, awsSecretAccessKey);
136        stsClient = initializeSTSClient(credentials);
137        amazonS3 = initializeS3Client(credentials);
138
139        if (!isBlank(bucketPrefix) && !bucketPrefix.endsWith("/")) {
140            log.warn(String.format("%s %s S3 bucket prefix should end with '/': added automatically.",
141                    BUCKET_PREFIX_PROPERTY, bucketPrefix));
142            bucketPrefix += "/";
143        }
144    }
145
146    protected AWSSecurityTokenService initializeSTSClient(AWSCredentialsProvider credentials) {
147        return AWSSecurityTokenServiceClientBuilder.standard()
148                                                   .withRegion(region)
149                                                   .withCredentials(credentials)
150                                                   .build();
151    }
152
153    protected AmazonS3 initializeS3Client(AWSCredentialsProvider credentials) {
154        return AmazonS3ClientBuilder.standard()
155                                    .withRegion(region)
156                                    .withCredentials(credentials)
157                                    .withAccelerateModeEnabled(accelerateModeEnabled)
158                                    .build();
159    }
160
161    @Override
162    public Batch getBatch(String batchId) {
163        Map<String, Serializable> parameters = getBatchParameters(batchId);
164        if (parameters == null) {
165            return null;
166        }
167
168        // create the batch
169        Batch batch = new Batch(batchId, parameters, getName(), getTransientStore());
170
171        String name = batchId.substring(0, Math.min(32, batchId.length()));
172
173        GetFederationTokenRequest request = new GetFederationTokenRequest().withPolicy(policy).withName(name);
174
175        if (expiration > 0) {
176            request.setDurationSeconds(expiration);
177        }
178
179        Credentials credentials = getSTSCredentials(request);
180
181        Map<String, Object> properties = batch.getProperties();
182        properties.put(INFO_AWS_SECRET_KEY_ID, credentials.getAccessKeyId());
183        properties.put(INFO_AWS_SECRET_ACCESS_KEY, credentials.getSecretAccessKey());
184        properties.put(INFO_AWS_SESSION_TOKEN, credentials.getSessionToken());
185        properties.put(INFO_BUCKET, bucket);
186        properties.put(INFO_BASE_KEY, bucketPrefix);
187        properties.put(INFO_EXPIRATION, credentials.getExpiration().toInstant().toEpochMilli());
188        properties.put(INFO_AWS_REGION, region);
189        properties.put(INFO_USE_S3_ACCELERATE, accelerateModeEnabled);
190
191        return batch;
192    }
193
194    protected Credentials getSTSCredentials(GetFederationTokenRequest request) {
195        return stsClient.getFederationToken(request).getCredentials();
196    }
197
198    @Override
199    public boolean completeUpload(String batchId, String fileIndex, BatchFileInfo fileInfo) {
200        String fileKey = fileInfo.getKey();
201        ObjectMetadata metadata = amazonS3.getObjectMetadata(bucket, fileKey);
202        String etag = metadata.getETag();
203        if (isEmpty(etag)) {
204            return false;
205        }
206        String mimeType = metadata.getContentType();
207        String encoding = metadata.getContentEncoding();
208
209        ObjectMetadata newMetadata;
210        if (metadata.getContentLength() > lowerThresholdToUseMultipartCopy()) {
211            newMetadata = S3Utils.copyFileMultipart(amazonS3, metadata, bucket, fileKey, bucket, etag, true);
212        } else {
213            newMetadata = S3Utils.copyFile(amazonS3, metadata, bucket, fileKey, bucket, etag, true);
214            boolean isMultipartUpload = REGEX_MULTIPART_ETAG.matcher(etag).find();
215            if (isMultipartUpload) {
216                String previousEtag = etag;
217                etag = newMetadata.getETag();
218                newMetadata = S3Utils.copyFile(amazonS3, metadata, bucket, previousEtag, bucket, etag, true);
219            }
220        }
221
222        String blobKey = transientStoreName + ':' + etag;
223        String filename = fileInfo.getFilename();
224        long length = newMetadata.getContentLength();
225        String digest = newMetadata.getContentMD5();
226        String blobProviderId = transientStoreName; // TODO decouple this
227        Binary binary = new LazyBinary(blobKey, blobProviderId, null);
228        Blob blob = new BinaryBlob(binary, blobKey, filename, mimeType, encoding, digest, length);
229        Batch batch = getBatch(batchId);
230        batch.addFile(fileIndex, blob, filename, mimeType);
231
232        return true;
233    }
234
235    protected long lowerThresholdToUseMultipartCopy() {
236        return NON_MULTIPART_COPY_MAX_SIZE;
237    }
238
239}