001/*
002 * (C) Copyright 2011-2020 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Luís Duarte
018 *     Florent Guillaume
019 *     Mickaël Schoentgen
020 */
021package org.nuxeo.ecm.core.storage.sql;
022
023import static org.apache.commons.lang3.StringUtils.defaultIfBlank;
024import static org.apache.commons.lang3.StringUtils.defaultIfEmpty;
025import static org.apache.commons.lang3.StringUtils.defaultString;
026import static org.apache.commons.lang3.StringUtils.isBlank;
027import static org.apache.commons.lang3.StringUtils.isEmpty;
028import static org.apache.commons.lang3.StringUtils.isNotBlank;
029import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.ACCELERATE_MODE_PROPERTY;
030import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.AWS_ID_PROPERTY;
031import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.AWS_SECRET_PROPERTY;
032import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.AWS_SESSION_TOKEN_PROPERTY;
033import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.BUCKET_NAME_PROPERTY;
034import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.BUCKET_PREFIX_PROPERTY;
035import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.BUCKET_REGION_PROPERTY;
036import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.ENDPOINT_PROPERTY;
037import static org.nuxeo.ecm.core.storage.sql.S3BinaryManager.PATHSTYLEACCESS_PROPERTY;
038
039import java.io.IOException;
040import java.io.Serializable;
041import java.util.HashMap;
042import java.util.Map;
043import java.util.Objects;
044
045import org.apache.commons.lang3.StringUtils;
046import org.apache.commons.logging.Log;
047import org.apache.commons.logging.LogFactory;
048import org.nuxeo.ecm.automation.server.jaxrs.batch.Batch;
049import org.nuxeo.ecm.automation.server.jaxrs.batch.handler.AbstractBatchHandler;
050import org.nuxeo.ecm.automation.server.jaxrs.batch.handler.BatchFileInfo;
051import org.nuxeo.ecm.blob.s3.S3ManagedTransfer;
052import org.nuxeo.ecm.core.api.Blob;
053import org.nuxeo.ecm.core.api.NuxeoException;
054import org.nuxeo.ecm.core.blob.BlobInfo;
055import org.nuxeo.ecm.core.blob.BlobManager;
056import org.nuxeo.ecm.core.blob.BlobProvider;
057import org.nuxeo.ecm.core.blob.BlobStoreBlobProvider;
058import org.nuxeo.runtime.api.Framework;
059import org.nuxeo.runtime.aws.NuxeoAWSRegionProvider;
060
061import com.amazonaws.auth.AWSCredentialsProvider;
062import com.amazonaws.client.builder.AwsClientBuilder;
063import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
064import com.amazonaws.services.s3.AmazonS3;
065import com.amazonaws.services.s3.AmazonS3ClientBuilder;
066import com.amazonaws.services.s3.model.AmazonS3Exception;
067import com.amazonaws.services.s3.model.CopyObjectRequest;
068import com.amazonaws.services.s3.model.ObjectMetadata;
069import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
070import com.amazonaws.services.s3.transfer.Copy;
071import com.amazonaws.services.s3.transfer.TransferManager;
072import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
073import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
074import com.amazonaws.services.securitytoken.model.AssumeRoleRequest;
075import com.amazonaws.services.securitytoken.model.Credentials;
076
077/**
078 * Batch Handler allowing direct S3 upload.
079 *
080 * @since 10.1
081 */
082public class S3DirectBatchHandler extends AbstractBatchHandler {
083
084    private static final Log log = LogFactory.getLog(S3DirectBatchHandler.class);
085
086    // properties passed at initialization time from extension point
087
088    /** @deprecated since 11.1, use {@link S3BinaryManager#ACCELERATE_MODE_PROPERTY} */
089    @Deprecated
090    public static final String ACCELERATE_MODE_ENABLED_PROPERTY = "accelerateMode";
091
092    public static final String POLICY_TEMPLATE_PROPERTY = "policyTemplate";
093
094    /**
095     * @since 10.10
096     */
097    public static final String ROLE_ARN_PROPERTY = "roleArn";
098
099    /**
100     * @since 11.1
101     */
102    public static final String BLOB_PROVIDER_ID_PROPERTY = "blobProvider";
103
104    // keys in the batch properties, returned to the client
105
106    public static final String INFO_AWS_SECRET_KEY_ID = "awsSecretKeyId";
107
108    public static final String INFO_AWS_SECRET_ACCESS_KEY = "awsSecretAccessKey";
109
110    public static final String INFO_AWS_SESSION_TOKEN = "awsSessionToken";
111
112    public static final String INFO_BUCKET = "bucket";
113
114    public static final String INFO_BASE_KEY = "baseKey";
115
116    public static final String INFO_EXPIRATION = "expiration";
117
118    /** @since 11.1 */
119    public static final String INFO_AWS_ENDPOINT = "endpoint";
120
121    /** @since 11.1 */
122    public static final String INFO_AWS_PATH_STYLE_ACCESS = "usePathStyleAccess";
123
124    public static final String INFO_AWS_REGION = "region";
125
126    public static final String INFO_USE_S3_ACCELERATE = "useS3Accelerate";
127
128    protected AWSSecurityTokenService stsClient;
129
130    protected AmazonS3 amazonS3;
131
132    protected String endpoint;
133
134    protected boolean pathStyleAccessEnabled;
135
136    protected String region;
137
138    protected String bucket;
139
140    protected String bucketPrefix;
141
142    protected boolean accelerateModeEnabled;
143
144    protected int expiration;
145
146    protected String policy;
147
148    protected String roleArn;
149
150    protected boolean useServerSideEncryption;
151
152    protected String serverSideKMSKeyID;
153
154    protected String blobProviderId;
155
156    @Override
157    protected void initialize(Map<String, String> properties) {
158        super.initialize(properties);
159        endpoint = properties.get(ENDPOINT_PROPERTY);
160        pathStyleAccessEnabled = Boolean.parseBoolean(properties.get(PATHSTYLEACCESS_PROPERTY));
161        region = properties.get(BUCKET_REGION_PROPERTY);
162        if (isBlank(region)) {
163            region = NuxeoAWSRegionProvider.getInstance().getRegion();
164        }
165        bucket = properties.get(BUCKET_NAME_PROPERTY);
166        if (isBlank(bucket)) {
167            throw new NuxeoException("Missing configuration property: " + BUCKET_NAME_PROPERTY);
168        }
169        roleArn = properties.get(ROLE_ARN_PROPERTY);
170        if (isBlank(roleArn)) {
171            throw new NuxeoException("Missing configuration property: " + ROLE_ARN_PROPERTY);
172        }
173        bucketPrefix = defaultString(properties.get(BUCKET_PREFIX_PROPERTY));
174        accelerateModeEnabled = Boolean.parseBoolean(properties.get(ACCELERATE_MODE_PROPERTY));
175        String awsSecretKeyId = properties.get(AWS_ID_PROPERTY);
176        String awsSecretAccessKey = properties.get(AWS_SECRET_PROPERTY);
177        String awsSessionToken = properties.get(AWS_SESSION_TOKEN_PROPERTY);
178        expiration = Integer.parseInt(defaultIfEmpty(properties.get(INFO_EXPIRATION), "0"));
179        policy = properties.get(POLICY_TEMPLATE_PROPERTY);
180
181        useServerSideEncryption = Boolean.parseBoolean(properties.get(S3BinaryManager.SERVERSIDE_ENCRYPTION_PROPERTY));
182        serverSideKMSKeyID = properties.get(S3BinaryManager.SERVERSIDE_ENCRYPTION_KMS_KEY_PROPERTY);
183
184        AWSCredentialsProvider credentials = S3Utils.getAWSCredentialsProvider(awsSecretKeyId, awsSecretAccessKey,
185                awsSessionToken);
186        stsClient = initializeSTSClient(credentials);
187        amazonS3 = initializeS3Client(credentials);
188
189        if (!isBlank(bucketPrefix) && !bucketPrefix.endsWith("/")) {
190            log.debug(String.format("%s %s S3 bucket prefix should end with '/': added automatically.",
191                    BUCKET_PREFIX_PROPERTY, bucketPrefix));
192            bucketPrefix += "/";
193        }
194
195        blobProviderId = defaultString(properties.get(BLOB_PROVIDER_ID_PROPERTY), transientStoreName);
196    }
197
198    protected AWSSecurityTokenService initializeSTSClient(AWSCredentialsProvider credentials) {
199        AWSSecurityTokenServiceClientBuilder builder = AWSSecurityTokenServiceClientBuilder.standard();
200        initializeBuilder(builder, credentials);
201        return builder.build();
202    }
203
204    protected AmazonS3 initializeS3Client(AWSCredentialsProvider credentials) {
205        AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard();
206        initializeBuilder(builder, credentials);
207        builder.setPathStyleAccessEnabled(pathStyleAccessEnabled);
208        builder.setAccelerateModeEnabled(accelerateModeEnabled);
209        return builder.build();
210    }
211
212    protected void initializeBuilder(AwsClientBuilder<?, ?> builder, AWSCredentialsProvider credentials) {
213        if (isBlank(endpoint)) {
214            builder.setRegion(region);
215        } else {
216            builder.setEndpointConfiguration(new EndpointConfiguration(endpoint, region));
217        }
218        builder.setCredentials(credentials);
219    }
220
221    @Override
222    public Batch getBatch(String batchId) {
223        Map<String, Serializable> parameters = getBatchParameters(batchId);
224        if (parameters == null) {
225            return null;
226        }
227
228        // create the batch
229        Batch batch = new Batch(batchId, parameters, getName(), getTransientStore());
230
231        Credentials credentials = getAwsCredentials(batchId);
232
233        Map<String, Object> properties = batch.getProperties();
234        properties.put(INFO_AWS_SECRET_KEY_ID, credentials.getAccessKeyId());
235        properties.put(INFO_AWS_SECRET_ACCESS_KEY, credentials.getSecretAccessKey());
236        properties.put(INFO_AWS_SESSION_TOKEN, credentials.getSessionToken());
237        properties.put(INFO_BUCKET, bucket);
238        properties.put(INFO_BASE_KEY, bucketPrefix);
239        properties.put(INFO_EXPIRATION, credentials.getExpiration().toInstant().toEpochMilli());
240        properties.put(INFO_AWS_ENDPOINT, defaultIfBlank(endpoint, null));
241        properties.put(INFO_AWS_PATH_STYLE_ACCESS, pathStyleAccessEnabled);
242        properties.put(INFO_AWS_REGION, region);
243        properties.put(INFO_USE_S3_ACCELERATE, accelerateModeEnabled);
244
245        return batch;
246    }
247
248    protected Credentials assumeRole(AssumeRoleRequest request) {
249        return stsClient.assumeRole(request).getCredentials();
250    }
251
252    @Override
253    public boolean completeUpload(String batchId, String fileIndex, BatchFileInfo fileInfo) {
254        String fileKey = fileInfo.getKey();
255        ObjectMetadata metadata = amazonS3.getObjectMetadata(bucket, fileKey);
256
257        String key;
258        String eTag = metadata.getETag();
259
260        if (!useDeDuplication()) {
261            key = StringUtils.removeStart(fileKey, bucketPrefix);
262        } else {
263            key = eTag;
264            if (isEmpty(key)) {
265                return false;
266            }
267            String bucketKey = bucketPrefix + key;
268            CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucket, fileKey, bucket, bucketKey);
269            // server-side encryption
270            if (useServerSideEncryption) {
271                if (isNotBlank(serverSideKMSKeyID)) {
272                    // SSE-KMS
273                    SSEAwsKeyManagementParams params = new SSEAwsKeyManagementParams(serverSideKMSKeyID);
274                    copyObjectRequest.setSSEAwsKeyManagementParams(params);
275                } else {
276                    // SSE-S3
277                    ObjectMetadata newObjectMetadata = new ObjectMetadata();
278                    newObjectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
279                    copyObjectRequest.setNewObjectMetadata(newObjectMetadata);
280                }
281                // TODO SSE-C
282            }
283
284            eTag = doMove(copyObjectRequest);
285
286            // if we did a multipart upload but can do a non multipart copy we can get back the digest as key
287            boolean isMultipartUpload = key.matches(".+-\\d+$");
288            boolean canDoNonMultipartCopy = metadata.getContentLength() < getTransferManager().getConfiguration()
289                                                                                              .getMultipartCopyThreshold();
290            if (isMultipartUpload && canDoNonMultipartCopy) {
291                key = eTag;
292                String previousBucketKey = bucketKey;
293                bucketKey = bucketPrefix + key;
294                if (amazonS3.doesObjectExist(bucket, bucketKey)) {
295                    // another thread has uploaded the same blob and has done the move
296                    // clean up remaining S3 object if needed
297                    deleteObjectAfterMove(previousBucketKey);
298                } else {
299                    // move S3 object to eTag as key
300                    var renameRequest = new CopyObjectRequest(bucket, previousBucketKey, bucket, bucketKey);
301                    eTag = doMove(renameRequest);
302                }
303            }
304        }
305
306        BlobInfo blobInfo = new BlobInfo();
307        blobInfo.mimeType = metadata.getContentType();
308        blobInfo.encoding = metadata.getContentEncoding();
309        blobInfo.filename = fileInfo.getFilename();
310        blobInfo.length = metadata.getContentLength();
311        blobInfo.key = key;
312        blobInfo.digest = defaultString(metadata.getContentMD5(), eTag);
313
314        Blob blob;
315
316        try {
317            blob = getBlobProvider().readBlob(blobInfo);
318        } catch (IOException e) {
319            throw new NuxeoException(e);
320        }
321
322        Batch batch = getBatch(batchId);
323        try {
324            batch.addFile(fileIndex, blob, blob.getFilename(), blob.getMimeType());
325        } catch (NuxeoException e) {
326            try {
327                amazonS3.deleteObject(bucket, fileKey);
328            } catch (AmazonS3Exception s3E) {
329                e.addSuppressed(s3E);
330            }
331            throw e;
332        }
333
334        return true;
335    }
336
337    /**
338     * @return the ETag of copied object
339     */
340    protected String doMove(CopyObjectRequest request) {
341        Copy rename = getTransferManager().copy(request);
342        try {
343            var copyResult = rename.waitForCopyResult();
344            return copyResult.getETag();
345        } catch (InterruptedException e) {
346            Thread.currentThread().interrupt();
347            throw new NuxeoException(e);
348        } finally {
349            deleteObjectAfterMove(request.getSourceKey());
350        }
351    }
352
353    protected void deleteObjectAfterMove(String key) {
354        try {
355            amazonS3.deleteObject(bucket, key);
356        } catch (AmazonS3Exception e) {
357            log.debug("Unable to cleanup object, move has already been done", e);
358        }
359    }
360
361    protected BlobProvider getBlobProvider() {
362        return Framework.getService(BlobManager.class).getBlobProvider(blobProviderId);
363    }
364
365    protected boolean useDeDuplication() {
366        BlobProvider blobProvider = getBlobProvider();
367        if (blobProvider instanceof BlobStoreBlobProvider) {
368            return ((BlobStoreBlobProvider) blobProvider).getKeyStrategy().useDeDuplication();
369        }
370        return true;
371    }
372
373    protected TransferManager getTransferManager() {
374        BlobProvider blobProvider = Framework.getService(BlobManager.class).getBlobProvider(blobProviderId);
375        if (!(blobProvider instanceof S3ManagedTransfer)) {
376            throw new NuxeoException("BlobProvider does not implement S3ManagedTransfer");
377        }
378        return ((S3ManagedTransfer) blobProvider).getTransferManager();
379    }
380
381    protected Credentials getAwsCredentials(String batchId) {
382        AssumeRoleRequest request = new AssumeRoleRequest().withRoleArn(roleArn)
383                                                           .withPolicy(policy)
384                                                           .withRoleSessionName(batchId);
385        if (expiration > 0) {
386            request.setDurationSeconds(expiration);
387        }
388        return assumeRole(request);
389    }
390
391    /** @since 11.1 */
392    @Override
393    public Map<String, Object> refreshToken(String batchId) {
394        Objects.requireNonNull(batchId, "required batch ID");
395
396        Credentials credentials = getAwsCredentials(batchId);
397        Map<String, Object> result = new HashMap<>();
398        result.put(INFO_AWS_SECRET_KEY_ID, credentials.getAccessKeyId());
399        result.put(INFO_AWS_SECRET_ACCESS_KEY, credentials.getSecretAccessKey());
400        result.put(INFO_AWS_SESSION_TOKEN, credentials.getSessionToken());
401        result.put(INFO_EXPIRATION, credentials.getExpiration().toInstant().toEpochMilli());
402        return result;
403    }
404
405}