001/*
002 * (C) Copyright 2011-2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Mathieu Guillaume
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.storage.sql;
021
022import static org.apache.commons.lang.StringUtils.isBlank;
023import static org.apache.commons.lang.StringUtils.isNotBlank;
024
025import java.io.File;
026import java.io.FileInputStream;
027import java.io.IOException;
028import java.net.URI;
029import java.net.URISyntaxException;
030import java.net.URL;
031import java.security.GeneralSecurityException;
032import java.security.KeyPair;
033import java.security.KeyStore;
034import java.security.PrivateKey;
035import java.security.PublicKey;
036import java.security.cert.Certificate;
037import java.util.Collection;
038import java.util.Date;
039import java.util.HashSet;
040import java.util.Set;
041import java.util.regex.Pattern;
042
043import javax.servlet.http.HttpServletRequest;
044
045import org.apache.commons.codec.digest.DigestUtils;
046import org.apache.commons.lang.StringUtils;
047import org.apache.commons.logging.Log;
048import org.apache.commons.logging.LogFactory;
049import org.nuxeo.common.Environment;
050import org.nuxeo.ecm.blob.AbstractBinaryGarbageCollector;
051import org.nuxeo.ecm.blob.AbstractCloudBinaryManager;
052import org.nuxeo.ecm.core.blob.ManagedBlob;
053import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
054import org.nuxeo.ecm.core.blob.binary.FileStorage;
055import org.nuxeo.runtime.api.Framework;
056
057import com.amazonaws.AmazonClientException;
058import com.amazonaws.AmazonServiceException;
059import com.amazonaws.ClientConfiguration;
060import com.amazonaws.HttpMethod;
061import com.amazonaws.auth.AWSCredentialsProvider;
062import com.amazonaws.auth.InstanceProfileCredentialsProvider;
063import com.amazonaws.client.builder.AwsClientBuilder;
064import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
065import com.amazonaws.services.s3.AmazonS3;
066import com.amazonaws.services.s3.AmazonS3ClientBuilder;
067import com.amazonaws.services.s3.AmazonS3EncryptionClientBuilder;
068import com.amazonaws.services.s3.model.AmazonS3Exception;
069import com.amazonaws.services.s3.model.CannedAccessControlList;
070import com.amazonaws.services.s3.model.CryptoConfiguration;
071import com.amazonaws.services.s3.model.EncryptedPutObjectRequest;
072import com.amazonaws.services.s3.model.EncryptionMaterials;
073import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest;
074import com.amazonaws.services.s3.model.GetObjectRequest;
075import com.amazonaws.services.s3.model.ObjectListing;
076import com.amazonaws.services.s3.model.ObjectMetadata;
077import com.amazonaws.services.s3.model.PutObjectRequest;
078import com.amazonaws.services.s3.model.S3ObjectSummary;
079import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider;
080import com.amazonaws.services.s3.transfer.Download;
081import com.amazonaws.services.s3.transfer.TransferManager;
082import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
083import com.amazonaws.services.s3.transfer.Upload;
084import com.google.common.base.MoreObjects;
085
086/**
087 * A Binary Manager that stores binaries as S3 BLOBs
088 * <p>
089 * The BLOBs are cached locally on first access for efficiency.
090 * <p>
091 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file
092 * if accessed before the stream.
093 */
094public class S3BinaryManager extends AbstractCloudBinaryManager {
095
096    private static final String MD5 = "MD5"; // must be MD5 for Etag
097
098    @Override
099    protected String getDefaultDigestAlgorithm() {
100        return MD5;
101    }
102
103    private static final Log log = LogFactory.getLog(S3BinaryManager.class);
104
105    public static final String SYSTEM_PROPERTY_PREFIX = "nuxeo.s3storage";
106
107    public static final String BUCKET_NAME_PROPERTY = "bucket";
108
109    public static final String BUCKET_PREFIX_PROPERTY = "bucket_prefix";
110
111    public static final String BUCKET_REGION_PROPERTY = "region";
112
113    public static final String DEFAULT_BUCKET_REGION = "us-east-1"; // US East
114
115    public static final String AWS_ID_PROPERTY = "awsid";
116
117    public static final String AWS_ID_ENV = "AWS_ACCESS_KEY_ID";
118
119    public static final String AWS_SECRET_PROPERTY = "awssecret";
120
121    public static final String AWS_SECRET_ENV = "AWS_SECRET_ACCESS_KEY";
122
123    /** AWS ClientConfiguration default 50 */
124    public static final String CONNECTION_MAX_PROPERTY = "connection.max";
125
126    /** AWS ClientConfiguration default 3 (with exponential backoff) */
127    public static final String CONNECTION_RETRY_PROPERTY = "connection.retry";
128
129    /** AWS ClientConfiguration default 50*1000 = 50s */
130    public static final String CONNECTION_TIMEOUT_PROPERTY = "connection.timeout";
131
132    /** AWS ClientConfiguration default 50*1000 = 50s */
133    public static final String SOCKET_TIMEOUT_PROPERTY = "socket.timeout";
134
135    public static final String KEYSTORE_FILE_PROPERTY = "crypt.keystore.file";
136
137    public static final String KEYSTORE_PASS_PROPERTY = "crypt.keystore.password";
138
139    public static final String SERVERSIDE_ENCRYPTION_PROPERTY = "crypt.serverside";
140
141    public static final String PRIVKEY_ALIAS_PROPERTY = "crypt.key.alias";
142
143    public static final String PRIVKEY_PASS_PROPERTY = "crypt.key.password";
144
145    public static final String ENDPOINT_PROPERTY = "endpoint";
146
147    public static final String DIRECTDOWNLOAD_PROPERTY_COMPAT = "downloadfroms3";
148
149    public static final String DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT = "downloadfroms3.expire";
150
151    private static final Pattern MD5_RE = Pattern.compile("[0-9a-f]{32}");
152
153    protected String bucketName;
154
155    protected String bucketNamePrefix;
156
157    protected AWSCredentialsProvider awsCredentialsProvider;
158
159    protected ClientConfiguration clientConfiguration;
160
161    protected EncryptionMaterials encryptionMaterials;
162
163    protected boolean isEncrypted;
164
165    protected CryptoConfiguration cryptoConfiguration;
166
167    protected boolean userServerSideEncryption;
168
169    protected AmazonS3 amazonS3;
170
171    protected TransferManager transferManager;
172
173    @Override
174    public void close() {
175        // this also shuts down the AmazonS3Client
176        transferManager.shutdownNow();
177        super.close();
178    }
179
180    /**
181     * Aborts uploads that crashed and are older than 1 day.
182     *
183     * @since 7.2
184     */
185    protected void abortOldUploads() throws IOException {
186        int oneDay = 1000 * 60 * 60 * 24;
187        try {
188            transferManager.abortMultipartUploads(bucketName, new Date(System.currentTimeMillis() - oneDay));
189        } catch (AmazonS3Exception e) {
190            if (e.getStatusCode() == 400 || e.getStatusCode() == 404) {
191                log.error("Your cloud provider does not support aborting old uploads");
192                return;
193            }
194            throw new IOException("Failed to abort old uploads", e);
195        }
196    }
197
198    @Override
199    protected void setupCloudClient() throws IOException {
200        // Get settings from the configuration
201        bucketName = getProperty(BUCKET_NAME_PROPERTY);
202        bucketNamePrefix = MoreObjects.firstNonNull(getProperty(BUCKET_PREFIX_PROPERTY), StringUtils.EMPTY);
203        String bucketRegion = getProperty(BUCKET_REGION_PROPERTY);
204        if (isBlank(bucketRegion)) {
205            bucketRegion = DEFAULT_BUCKET_REGION;
206        }
207        String awsID = getProperty(AWS_ID_PROPERTY);
208        String awsSecret = getProperty(AWS_SECRET_PROPERTY);
209
210        String proxyHost = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_HOST);
211        String proxyPort = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PORT);
212        String proxyLogin = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_LOGIN);
213        String proxyPassword = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PASSWORD);
214
215        int maxConnections = getIntProperty(CONNECTION_MAX_PROPERTY);
216        int maxErrorRetry = getIntProperty(CONNECTION_RETRY_PROPERTY);
217        int connectionTimeout = getIntProperty(CONNECTION_TIMEOUT_PROPERTY);
218        int socketTimeout = getIntProperty(SOCKET_TIMEOUT_PROPERTY);
219
220        String keystoreFile = getProperty(KEYSTORE_FILE_PROPERTY);
221        String keystorePass = getProperty(KEYSTORE_PASS_PROPERTY);
222        String privkeyAlias = getProperty(PRIVKEY_ALIAS_PROPERTY);
223        String privkeyPass = getProperty(PRIVKEY_PASS_PROPERTY);
224        String endpoint = getProperty(ENDPOINT_PROPERTY);
225        String sseprop = getProperty(SERVERSIDE_ENCRYPTION_PROPERTY);
226        if (isNotBlank(sseprop)) {
227            userServerSideEncryption = Boolean.parseBoolean(sseprop);
228        }
229
230        // Fallback on default env keys for ID and secret
231        if (isBlank(awsID)) {
232            awsID = System.getenv(AWS_ID_ENV);
233        }
234        if (isBlank(awsSecret)) {
235            awsSecret = System.getenv(AWS_SECRET_ENV);
236        }
237
238        if (isBlank(bucketName)) {
239            throw new RuntimeException("Missing conf: " + BUCKET_NAME_PROPERTY);
240        }
241
242        if (!isBlank(bucketNamePrefix) && !bucketNamePrefix.endsWith("/")) {
243            log.warn(String.format("%s %s S3 bucket prefix should end by '/' " + ": added automatically.",
244                    BUCKET_PREFIX_PROPERTY, bucketNamePrefix));
245            bucketNamePrefix += "/";
246        }
247        // set up credentials
248        if (isBlank(awsID) || isBlank(awsSecret)) {
249            awsCredentialsProvider = InstanceProfileCredentialsProvider.getInstance();
250            try {
251                awsCredentialsProvider.getCredentials();
252            } catch (AmazonClientException e) {
253                throw new RuntimeException("Missing AWS credentials and no instance role found", e);
254            }
255        } else {
256            awsCredentialsProvider = new BasicAWSCredentialsProvider(awsID, awsSecret);
257        }
258
259        // set up client configuration
260        clientConfiguration = new ClientConfiguration();
261        if (isNotBlank(proxyHost)) {
262            clientConfiguration.setProxyHost(proxyHost);
263        }
264        if (isNotBlank(proxyPort)) {
265            clientConfiguration.setProxyPort(Integer.parseInt(proxyPort));
266        }
267        if (isNotBlank(proxyLogin)) {
268            clientConfiguration.setProxyUsername(proxyLogin);
269        }
270        if (proxyPassword != null) { // could be blank
271            clientConfiguration.setProxyPassword(proxyPassword);
272        }
273        if (maxConnections > 0) {
274            clientConfiguration.setMaxConnections(maxConnections);
275        }
276        if (maxErrorRetry >= 0) { // 0 is allowed
277            clientConfiguration.setMaxErrorRetry(maxErrorRetry);
278        }
279        if (connectionTimeout >= 0) { // 0 is allowed
280            clientConfiguration.setConnectionTimeout(connectionTimeout);
281        }
282        if (socketTimeout >= 0) { // 0 is allowed
283            clientConfiguration.setSocketTimeout(socketTimeout);
284        }
285
286        // set up encryption
287        encryptionMaterials = null;
288        if (isNotBlank(keystoreFile)) {
289            boolean confok = true;
290            if (keystorePass == null) { // could be blank
291                log.error("Keystore password missing");
292                confok = false;
293            }
294            if (isBlank(privkeyAlias)) {
295                log.error("Key alias missing");
296                confok = false;
297            }
298            if (privkeyPass == null) { // could be blank
299                log.error("Key password missing");
300                confok = false;
301            }
302            if (!confok) {
303                throw new RuntimeException("S3 Crypto configuration incomplete");
304            }
305            try {
306                // Open keystore
307                File ksFile = new File(keystoreFile);
308                FileInputStream ksStream = new FileInputStream(ksFile);
309                KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType());
310                keystore.load(ksStream, keystorePass.toCharArray());
311                ksStream.close();
312                // Get keypair for alias
313                if (!keystore.isKeyEntry(privkeyAlias)) {
314                    throw new RuntimeException("Alias " + privkeyAlias + " is missing or not a key alias");
315                }
316                PrivateKey privKey = (PrivateKey) keystore.getKey(privkeyAlias, privkeyPass.toCharArray());
317                Certificate cert = keystore.getCertificate(privkeyAlias);
318                PublicKey pubKey = cert.getPublicKey();
319                KeyPair keypair = new KeyPair(pubKey, privKey);
320                // Get encryptionMaterials from keypair
321                encryptionMaterials = new EncryptionMaterials(keypair);
322                cryptoConfiguration = new CryptoConfiguration();
323            } catch (IOException | GeneralSecurityException e) {
324                throw new RuntimeException("Could not read keystore: " + keystoreFile + ", alias: " + privkeyAlias, e);
325            }
326        }
327        isEncrypted = encryptionMaterials != null;
328
329        @SuppressWarnings("rawtypes")
330        AwsClientBuilder s3Builder;
331        // Try to create bucket if it doesn't exist
332        if (!isEncrypted) {
333            s3Builder = AmazonS3ClientBuilder.standard()
334                            .withCredentials(awsCredentialsProvider)
335                            .withClientConfiguration(clientConfiguration);
336
337        } else {
338            s3Builder = AmazonS3EncryptionClientBuilder.standard()
339                            .withClientConfiguration(clientConfiguration)
340                            .withCryptoConfiguration(cryptoConfiguration)
341                            .withCredentials(awsCredentialsProvider)
342                            .withEncryptionMaterials(new StaticEncryptionMaterialsProvider(encryptionMaterials));
343        }
344        if (isNotBlank(endpoint)) {
345            s3Builder = s3Builder.withEndpointConfiguration(new EndpointConfiguration(endpoint, bucketRegion));
346        } else {
347            s3Builder = s3Builder.withRegion(bucketRegion);
348        }
349
350        amazonS3 = (AmazonS3) s3Builder.build();
351
352        try {
353            if (!amazonS3.doesBucketExist(bucketName)) {
354                amazonS3.createBucket(bucketName);
355                amazonS3.setBucketAcl(bucketName, CannedAccessControlList.Private);
356            }
357        } catch (AmazonClientException e) {
358            throw new IOException(e);
359        }
360
361        // compat for NXP-17895, using "downloadfroms3", to be removed
362        // these two fields have already been initialized by the base class initialize()
363        // using standard property "directdownload"
364        String dd = getProperty(DIRECTDOWNLOAD_PROPERTY_COMPAT);
365        if (dd != null) {
366            directDownload = Boolean.parseBoolean(dd);
367        }
368        int dde = getIntProperty(DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT);
369        if (dde >= 0) {
370            directDownloadExpire = dde;
371        }
372
373        transferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build();
374        abortOldUploads();
375    }
376
377    protected void removeBinary(String digest) {
378        amazonS3.deleteObject(bucketName, bucketNamePrefix + digest);
379    }
380
381    @Override
382    protected String getSystemPropertyPrefix() {
383        return SYSTEM_PROPERTY_PREFIX;
384    }
385
386    @Override
387    protected BinaryGarbageCollector instantiateGarbageCollector() {
388        return new S3BinaryGarbageCollector(this);
389    }
390
391    @Override
392    public void removeBinaries(Collection<String> digests) {
393        digests.forEach(this::removeBinary);
394    }
395
396    protected static boolean isMissingKey(AmazonClientException e) {
397        if (e instanceof AmazonServiceException) {
398            AmazonServiceException ase = (AmazonServiceException) e;
399            return (ase.getStatusCode() == 404) || "NoSuchKey".equals(ase.getErrorCode())
400                    || "Not Found".equals(e.getMessage());
401        }
402        return false;
403    }
404
405    public static boolean isMD5(String digest) {
406        return MD5_RE.matcher(digest).matches();
407    }
408
409    /**
410     * Used in the healthCheck; the transferManager should be initialized and the bucket accessible
411     *
412     * @since 9.3
413     */
414    public boolean canAccessBucket() {
415        return transferManager != null && transferManager.getAmazonS3Client().doesBucketExist(bucketName);
416    }
417
418    @Override
419    protected FileStorage getFileStorage() {
420        return new S3FileStorage();
421    }
422
423    public class S3FileStorage implements FileStorage {
424
425        @Override
426        public void storeFile(String digest, File file) throws IOException {
427            long t0 = 0;
428            if (log.isDebugEnabled()) {
429                t0 = System.currentTimeMillis();
430                log.debug("storing blob " + digest + " to S3");
431            }
432            String key = bucketNamePrefix + digest;
433            try {
434                amazonS3.getObjectMetadata(bucketName, key);
435                if (log.isDebugEnabled()) {
436                    log.debug("blob " + digest + " is already in S3");
437                }
438            } catch (AmazonClientException e) {
439                if (!isMissingKey(e)) {
440                    throw new IOException(e);
441                }
442                // not already present -> store the blob
443                PutObjectRequest request;
444                if (!isEncrypted) {
445                    request = new PutObjectRequest(bucketName, key, file);
446                    if (userServerSideEncryption) {
447                        ObjectMetadata objectMetadata = new ObjectMetadata();
448                        objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
449                        request.setMetadata(objectMetadata);
450                    }
451                } else {
452                    request = new EncryptedPutObjectRequest(bucketName, key, file);
453                }
454                Upload upload = transferManager.upload(request);
455                try {
456                    upload.waitForUploadResult();
457                } catch (AmazonClientException ee) {
458                    throw new IOException(ee);
459                } catch (InterruptedException ee) {
460                    // reset interrupted status
461                    Thread.currentThread().interrupt();
462                    // continue interrupt
463                    throw new RuntimeException(ee);
464                } finally {
465                    if (log.isDebugEnabled()) {
466                        long dtms = System.currentTimeMillis() - t0;
467                        log.debug("stored blob " + digest + " to S3 in " + dtms + "ms");
468                    }
469                }
470            }
471        }
472
473        @Override
474        public boolean fetchFile(String digest, File file) throws IOException {
475            long t0 = 0;
476            if (log.isDebugEnabled()) {
477                t0 = System.currentTimeMillis();
478                log.debug("fetching blob " + digest + " from S3");
479            }
480            try {
481                Download download = transferManager.download(new GetObjectRequest(bucketName, bucketNamePrefix + digest), file);
482                download.waitForCompletion();
483                // Check ETag it is by default MD5 if not multipart
484                if (!isEncrypted && !digest.equals(download.getObjectMetadata().getETag())) {
485                    // In case of multipart it will happen, verify the downloaded file
486                    String currentDigest;
487                    try (FileInputStream input = new FileInputStream(file)) {
488                        currentDigest = DigestUtils.md5Hex(input);
489                    }
490                    if (!currentDigest.equals(digest)) {
491                        log.error("Invalid ETag in S3, currentDigest=" + currentDigest + " expectedDigest=" + digest);
492                        throw new IOException("Invalid S3 object, it is corrupted expected digest is " + digest + " got " + currentDigest);
493                    }
494                }
495                return true;
496            } catch (AmazonClientException e) {
497                if (!isMissingKey(e)) {
498                    throw new IOException(e);
499                }
500                return false;
501            } catch (InterruptedException e) {
502                // reset interrupted status
503                Thread.currentThread().interrupt();
504                // continue interrupt
505                throw new RuntimeException(e);
506            } finally {
507                if (log.isDebugEnabled()) {
508                    long dtms = System.currentTimeMillis() - t0;
509                    log.debug("fetched blob " + digest + " from S3 in " + dtms + "ms");
510                }
511            }
512
513        }
514    }
515
516    /**
517     * Garbage collector for S3 binaries that stores the marked (in use) binaries in memory.
518     */
519    public static class S3BinaryGarbageCollector extends AbstractBinaryGarbageCollector<S3BinaryManager> {
520
521        protected S3BinaryGarbageCollector(S3BinaryManager binaryManager) {
522            super(binaryManager);
523        }
524
525        @Override
526        public String getId() {
527            return "s3:" + binaryManager.bucketName;
528        }
529
530        @Override
531        public Set<String> getUnmarkedBlobs() {
532            // list S3 objects in the bucket
533            // record those not marked
534            Set<String> unmarked = new HashSet<>();
535            ObjectListing list = null;
536            do {
537                if (list == null) {
538                    list = binaryManager.amazonS3.listObjects(binaryManager.bucketName, binaryManager.bucketNamePrefix);
539                } else {
540                    list = binaryManager.amazonS3.listNextBatchOfObjects(list);
541                }
542                int prefixLength = binaryManager.bucketNamePrefix.length();
543                for (S3ObjectSummary summary : list.getObjectSummaries()) {
544                    String digest = summary.getKey().substring(prefixLength);
545                    if (!isMD5(digest)) {
546                        // ignore files that cannot be MD5 digests for
547                        // safety
548                        continue;
549                    }
550                    long length = summary.getSize();
551                    if (marked.contains(digest)) {
552                        status.numBinaries++;
553                        status.sizeBinaries += length;
554                    } else {
555                        status.numBinariesGC++;
556                        status.sizeBinariesGC += length;
557                        // record file to delete
558                        unmarked.add(digest);
559                        marked.remove(digest); // optimize memory
560                    }
561                }
562            } while (list.isTruncated());
563
564            return unmarked;
565        }
566    }
567
568    // ******************** BlobProvider ********************
569
570    @Override
571    protected URI getRemoteUri(String digest, ManagedBlob blob, HttpServletRequest servletRequest) throws IOException {
572        String key = bucketNamePrefix + digest;
573        Date expiration = new Date();
574        expiration.setTime(expiration.getTime() + directDownloadExpire * 1000);
575        GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucketName, key, HttpMethod.GET);
576        request.addRequestParameter("response-content-type", getContentTypeHeader(blob));
577        request.addRequestParameter("response-content-disposition", getContentDispositionHeader(blob, null));
578        request.setExpiration(expiration);
579        URL url = amazonS3.generatePresignedUrl(request);
580        try {
581            return url.toURI();
582        } catch (URISyntaxException e) {
583            throw new IOException(e);
584        }
585    }
586
587}