001/*
002 * (C) Copyright 2011-2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Mathieu Guillaume
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.storage.sql;
021
022import static org.apache.commons.lang.StringUtils.isBlank;
023import static org.apache.commons.lang.StringUtils.isNotBlank;
024
025import java.io.File;
026import java.io.FileInputStream;
027import java.io.IOException;
028import java.net.URI;
029import java.net.URISyntaxException;
030import java.net.URL;
031import java.security.GeneralSecurityException;
032import java.security.KeyPair;
033import java.security.KeyStore;
034import java.security.PrivateKey;
035import java.security.PublicKey;
036import java.security.cert.Certificate;
037import java.util.Collection;
038import java.util.Date;
039import java.util.HashSet;
040import java.util.Set;
041import java.util.regex.Pattern;
042
043import javax.servlet.http.HttpServletRequest;
044
045import org.apache.commons.lang.StringUtils;
046import org.apache.commons.logging.Log;
047import org.apache.commons.logging.LogFactory;
048import org.nuxeo.common.Environment;
049import org.nuxeo.ecm.blob.AbstractBinaryGarbageCollector;
050import org.nuxeo.ecm.blob.AbstractCloudBinaryManager;
051import org.nuxeo.ecm.core.api.Blob;
052import org.nuxeo.ecm.core.blob.BlobManager.BlobInfo;
053import org.nuxeo.ecm.core.blob.ManagedBlob;
054import org.nuxeo.ecm.core.blob.binary.BinaryBlobProvider;
055import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
056import org.nuxeo.ecm.core.blob.binary.FileStorage;
057import org.nuxeo.ecm.core.model.Document;
058
059import com.amazonaws.AmazonClientException;
060import com.amazonaws.AmazonServiceException;
061import com.amazonaws.ClientConfiguration;
062import com.amazonaws.HttpMethod;
063import com.amazonaws.auth.AWSCredentialsProvider;
064import com.amazonaws.auth.InstanceProfileCredentialsProvider;
065import com.amazonaws.services.s3.AmazonS3;
066import com.amazonaws.services.s3.AmazonS3Client;
067import com.amazonaws.services.s3.AmazonS3EncryptionClient;
068import com.amazonaws.services.s3.internal.ServiceUtils;
069import com.amazonaws.services.s3.model.CannedAccessControlList;
070import com.amazonaws.services.s3.model.CryptoConfiguration;
071import com.amazonaws.services.s3.model.EncryptedPutObjectRequest;
072import com.amazonaws.services.s3.model.EncryptionMaterials;
073import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest;
074import com.amazonaws.services.s3.model.GetObjectRequest;
075import com.amazonaws.services.s3.model.ObjectListing;
076import com.amazonaws.services.s3.model.ObjectMetadata;
077import com.amazonaws.services.s3.model.PutObjectRequest;
078import com.amazonaws.services.s3.model.S3ObjectSummary;
079import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider;
080import com.amazonaws.services.s3.transfer.TransferManager;
081import com.amazonaws.services.s3.transfer.Upload;
082import com.amazonaws.services.s3.transfer.model.UploadResult;
083import com.google.common.base.MoreObjects;
084
085/**
086 * A Binary Manager that stores binaries as S3 BLOBs
087 * <p>
088 * The BLOBs are cached locally on first access for efficiency.
089 * <p>
090 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file
091 * if accessed before the stream.
092 */
093public class S3BinaryManager extends AbstractCloudBinaryManager {
094
095    private static final String MD5 = "MD5"; // must be MD5 for Etag
096
097    @Override
098    protected String getDefaultDigestAlgorithm() {
099        return MD5;
100    }
101
102    private static final Log log = LogFactory.getLog(S3BinaryManager.class);
103
104    public static final String SYSTEM_PROPERTY_PREFIX = "nuxeo.s3storage";
105
106    public static final String BUCKET_NAME_PROPERTY = "bucket";
107
108    public static final String BUCKET_PREFIX_PROPERTY = "bucket_prefix";
109
110    public static final String BUCKET_REGION_PROPERTY = "region";
111
112    public static final String DEFAULT_BUCKET_REGION = null; // US East
113
114    public static final String AWS_ID_PROPERTY = "awsid";
115
116    public static final String AWS_ID_ENV = "AWS_ACCESS_KEY_ID";
117
118    public static final String AWS_SECRET_PROPERTY = "awssecret";
119
120    public static final String AWS_SECRET_ENV = "AWS_SECRET_ACCESS_KEY";
121
122    /** AWS ClientConfiguration default 50 */
123    public static final String CONNECTION_MAX_PROPERTY = "connection.max";
124
125    /** AWS ClientConfiguration default 3 (with exponential backoff) */
126    public static final String CONNECTION_RETRY_PROPERTY = "connection.retry";
127
128    /** AWS ClientConfiguration default 50*1000 = 50s */
129    public static final String CONNECTION_TIMEOUT_PROPERTY = "connection.timeout";
130
131    /** AWS ClientConfiguration default 50*1000 = 50s */
132    public static final String SOCKET_TIMEOUT_PROPERTY = "socket.timeout";
133
134    public static final String KEYSTORE_FILE_PROPERTY = "crypt.keystore.file";
135
136    public static final String KEYSTORE_PASS_PROPERTY = "crypt.keystore.password";
137
138    public static final String SERVERSIDE_ENCRYPTION_PROPERTY = "crypt.serverside";
139
140    public static final String PRIVKEY_ALIAS_PROPERTY = "crypt.key.alias";
141
142    public static final String PRIVKEY_PASS_PROPERTY = "crypt.key.password";
143
144    public static final String ENDPOINT_PROPERTY = "endpoint";
145
146    public static final String DIRECTDOWNLOAD_PROPERTY_COMPAT = "downloadfroms3";
147
148    public static final String DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT = "downloadfroms3.expire";
149
150    private static final Pattern MD5_RE = Pattern.compile("(.*/)?[0-9a-f]{32}");
151
152    protected String bucketName;
153
154    protected String bucketNamePrefix;
155
156    protected AWSCredentialsProvider awsCredentialsProvider;
157
158    protected ClientConfiguration clientConfiguration;
159
160    protected EncryptionMaterials encryptionMaterials;
161
162    protected boolean isEncrypted;
163
164    protected CryptoConfiguration cryptoConfiguration;
165
166    protected boolean userServerSideEncryption;
167
168    protected AmazonS3 amazonS3;
169
170    protected TransferManager transferManager;
171
172    @Override
173    public void close() {
174        // this also shuts down the AmazonS3Client
175        transferManager.shutdownNow();
176        super.close();
177    }
178
179    /**
180     * Aborts uploads that crashed and are older than 1 day.
181     *
182     * @since 7.2
183     */
184    protected void abortOldUploads() throws IOException {
185        int oneDay = 1000 * 60 * 60 * 24;
186        try {
187            transferManager.abortMultipartUploads(bucketName, new Date(System.currentTimeMillis() - oneDay));
188        } catch (AmazonClientException e) {
189            throw new IOException("Failed to abort old uploads", e);
190        }
191    }
192
193    @Override
194    protected void setupCloudClient() throws IOException {
195        // Get settings from the configuration
196        bucketName = getProperty(BUCKET_NAME_PROPERTY);
197        bucketNamePrefix = MoreObjects.firstNonNull(getProperty(BUCKET_PREFIX_PROPERTY), StringUtils.EMPTY);
198        String bucketRegion = getProperty(BUCKET_REGION_PROPERTY);
199        if (isBlank(bucketRegion)) {
200            bucketRegion = DEFAULT_BUCKET_REGION;
201        }
202        String awsID = getProperty(AWS_ID_PROPERTY);
203        String awsSecret = getProperty(AWS_SECRET_PROPERTY);
204
205        String proxyHost = getProperty(Environment.NUXEO_HTTP_PROXY_HOST);
206        String proxyPort = getProperty(Environment.NUXEO_HTTP_PROXY_PORT);
207        String proxyLogin = getProperty(Environment.NUXEO_HTTP_PROXY_LOGIN);
208        String proxyPassword = getProperty(Environment.NUXEO_HTTP_PROXY_PASSWORD);
209
210        int maxConnections = getIntProperty(CONNECTION_MAX_PROPERTY);
211        int maxErrorRetry = getIntProperty(CONNECTION_RETRY_PROPERTY);
212        int connectionTimeout = getIntProperty(CONNECTION_TIMEOUT_PROPERTY);
213        int socketTimeout = getIntProperty(SOCKET_TIMEOUT_PROPERTY);
214
215        String keystoreFile = getProperty(KEYSTORE_FILE_PROPERTY);
216        String keystorePass = getProperty(KEYSTORE_PASS_PROPERTY);
217        String privkeyAlias = getProperty(PRIVKEY_ALIAS_PROPERTY);
218        String privkeyPass = getProperty(PRIVKEY_PASS_PROPERTY);
219        String endpoint = getProperty(ENDPOINT_PROPERTY);
220        String sseprop = getProperty(SERVERSIDE_ENCRYPTION_PROPERTY);
221        if (isNotBlank(sseprop)) {
222            userServerSideEncryption = Boolean.parseBoolean(sseprop);
223        }
224
225        // Fallback on default env keys for ID and secret
226        if (isBlank(awsID)) {
227            awsID = System.getenv(AWS_ID_ENV);
228        }
229        if (isBlank(awsSecret)) {
230            awsSecret = System.getenv(AWS_SECRET_ENV);
231        }
232
233        if (isBlank(bucketName)) {
234            throw new RuntimeException("Missing conf: " + BUCKET_NAME_PROPERTY);
235        }
236
237        if (!isBlank(bucketNamePrefix) && !bucketNamePrefix.endsWith("/")) {
238            log.warn(String.format("%s %s S3 bucket prefix should end by '/' " + ": added automatically.",
239                    BUCKET_PREFIX_PROPERTY, bucketNamePrefix));
240            bucketNamePrefix += "/";
241        }
242        // set up credentials
243        if (isBlank(awsID) || isBlank(awsSecret)) {
244            awsCredentialsProvider = new InstanceProfileCredentialsProvider();
245            try {
246                awsCredentialsProvider.getCredentials();
247            } catch (AmazonClientException e) {
248                throw new RuntimeException("Missing AWS credentials and no instance role found");
249            }
250        } else {
251            awsCredentialsProvider = new BasicAWSCredentialsProvider(awsID, awsSecret);
252        }
253
254        // set up client configuration
255        clientConfiguration = new ClientConfiguration();
256        if (isNotBlank(proxyHost)) {
257            clientConfiguration.setProxyHost(proxyHost);
258        }
259        if (isNotBlank(proxyPort)) {
260            clientConfiguration.setProxyPort(Integer.parseInt(proxyPort));
261        }
262        if (isNotBlank(proxyLogin)) {
263            clientConfiguration.setProxyUsername(proxyLogin);
264        }
265        if (proxyPassword != null) { // could be blank
266            clientConfiguration.setProxyPassword(proxyPassword);
267        }
268        if (maxConnections > 0) {
269            clientConfiguration.setMaxConnections(maxConnections);
270        }
271        if (maxErrorRetry >= 0) { // 0 is allowed
272            clientConfiguration.setMaxErrorRetry(maxErrorRetry);
273        }
274        if (connectionTimeout >= 0) { // 0 is allowed
275            clientConfiguration.setConnectionTimeout(connectionTimeout);
276        }
277        if (socketTimeout >= 0) { // 0 is allowed
278            clientConfiguration.setSocketTimeout(socketTimeout);
279        }
280
281        // set up encryption
282        encryptionMaterials = null;
283        if (isNotBlank(keystoreFile)) {
284            boolean confok = true;
285            if (keystorePass == null) { // could be blank
286                log.error("Keystore password missing");
287                confok = false;
288            }
289            if (isBlank(privkeyAlias)) {
290                log.error("Key alias missing");
291                confok = false;
292            }
293            if (privkeyPass == null) { // could be blank
294                log.error("Key password missing");
295                confok = false;
296            }
297            if (!confok) {
298                throw new RuntimeException("S3 Crypto configuration incomplete");
299            }
300            try {
301                // Open keystore
302                File ksFile = new File(keystoreFile);
303                FileInputStream ksStream = new FileInputStream(ksFile);
304                KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType());
305                keystore.load(ksStream, keystorePass.toCharArray());
306                ksStream.close();
307                // Get keypair for alias
308                if (!keystore.isKeyEntry(privkeyAlias)) {
309                    throw new RuntimeException("Alias " + privkeyAlias + " is missing or not a key alias");
310                }
311                PrivateKey privKey = (PrivateKey) keystore.getKey(privkeyAlias, privkeyPass.toCharArray());
312                Certificate cert = keystore.getCertificate(privkeyAlias);
313                PublicKey pubKey = cert.getPublicKey();
314                KeyPair keypair = new KeyPair(pubKey, privKey);
315                // Get encryptionMaterials from keypair
316                encryptionMaterials = new EncryptionMaterials(keypair);
317                cryptoConfiguration = new CryptoConfiguration();
318            } catch (IOException | GeneralSecurityException e) {
319                throw new RuntimeException("Could not read keystore: " + keystoreFile + ", alias: " + privkeyAlias, e);
320            }
321        }
322        isEncrypted = encryptionMaterials != null;
323
324        // Try to create bucket if it doesn't exist
325        if (!isEncrypted) {
326            amazonS3 = new AmazonS3Client(awsCredentialsProvider, clientConfiguration);
327        } else {
328            amazonS3 = new AmazonS3EncryptionClient(awsCredentialsProvider, new StaticEncryptionMaterialsProvider(
329                    encryptionMaterials), clientConfiguration, cryptoConfiguration);
330        }
331        if (isNotBlank(endpoint)) {
332            amazonS3.setEndpoint(endpoint);
333        }
334
335        try {
336            if (!amazonS3.doesBucketExist(bucketName)) {
337                amazonS3.createBucket(bucketName, bucketRegion);
338                amazonS3.setBucketAcl(bucketName, CannedAccessControlList.Private);
339            }
340        } catch (AmazonClientException e) {
341            throw new IOException(e);
342        }
343
344        // compat for NXP-17895, using "downloadfroms3", to be removed
345        // these two fields have already been initialized by the base class initialize()
346        // using standard property "directdownload"
347        String dd = getProperty(DIRECTDOWNLOAD_PROPERTY_COMPAT);
348        if (dd != null) {
349            directDownload = Boolean.parseBoolean(dd);
350        }
351        int dde = getIntProperty(DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT);
352        if (dde >= 0) {
353            directDownloadExpire = dde;
354        }
355
356        transferManager = new TransferManager(amazonS3);
357        abortOldUploads();
358    }
359
360    protected void removeBinary(String digest) {
361        amazonS3.deleteObject(bucketName, digest);
362    }
363
364    @Override
365    protected String getSystemPropertyPrefix() {
366        return SYSTEM_PROPERTY_PREFIX;
367    }
368
369    @Override
370    protected BinaryGarbageCollector instantiateGarbageCollector() {
371        return new S3BinaryGarbageCollector(this);
372    }
373
374    @Override
375    public void removeBinaries(Collection<String> digests) {
376        digests.forEach(this::removeBinary);
377    }
378
379    protected static boolean isMissingKey(AmazonClientException e) {
380        if (e instanceof AmazonServiceException) {
381            AmazonServiceException ase = (AmazonServiceException) e;
382            return (ase.getStatusCode() == 404) || "NoSuchKey".equals(ase.getErrorCode())
383                    || "Not Found".equals(e.getMessage());
384        }
385        return false;
386    }
387
388    public static boolean isMD5(String digest) {
389        return MD5_RE.matcher(digest).matches();
390    }
391
392    @Override
393    protected FileStorage getFileStorage() {
394        return new S3FileStorage();
395    }
396
397    public class S3FileStorage implements FileStorage {
398
399        @Override
400        public void storeFile(String digest, File file) throws IOException {
401            long t0 = 0;
402            if (log.isDebugEnabled()) {
403                t0 = System.currentTimeMillis();
404                log.debug("storing blob " + digest + " to S3");
405            }
406            String etag;
407            String key = bucketNamePrefix + digest;
408            try {
409                ObjectMetadata metadata = amazonS3.getObjectMetadata(bucketName, key);
410                etag = metadata.getETag();
411                if (log.isDebugEnabled()) {
412                    log.debug("blob " + digest + " is already in S3");
413                }
414            } catch (AmazonClientException e) {
415                if (!isMissingKey(e)) {
416                    throw new IOException(e);
417                }
418                // not already present -> store the blob
419                PutObjectRequest request;
420                if (!isEncrypted) {
421                    request = new PutObjectRequest(bucketName, key, file);
422                    if (userServerSideEncryption) {
423                        ObjectMetadata objectMetadata = new ObjectMetadata();
424                        objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
425                        request.setMetadata(objectMetadata);
426                    }
427                } else {
428                    request = new EncryptedPutObjectRequest(bucketName, key, file);
429                }
430                Upload upload = transferManager.upload(request);
431                try {
432                    UploadResult result = upload.waitForUploadResult();
433                    etag = result.getETag();
434                } catch (AmazonClientException ee) {
435                    throw new IOException(ee);
436                } catch (InterruptedException ee) {
437                    // reset interrupted status
438                    Thread.currentThread().interrupt();
439                    // continue interrupt
440                    throw new RuntimeException(ee);
441                } finally {
442                    if (log.isDebugEnabled()) {
443                        long dtms = System.currentTimeMillis() - t0;
444                        log.debug("stored blob " + digest + " to S3 in " + dtms + "ms");
445                    }
446                }
447            }
448            // check transfer went ok
449            if (!isEncrypted && !etag.equals(digest) && !ServiceUtils.isMultipartUploadETag(etag)) {
450                // When the blob is not encrypted by S3, the MD5 remotely
451                // computed by S3 and passed as a Etag should match the locally
452                // computed MD5 digest.
453                // This check cannot be done when encryption is enabled unless
454                // we could replicate that encryption locally just for that
455                // purpose which would add further load and complexity on the
456                // client.
457                throw new IOException("Invalid ETag in S3, ETag=" + etag + " digest=" + digest);
458            }
459        }
460
461        @Override
462        public boolean fetchFile(String digest, File file) throws IOException {
463            long t0 = 0;
464            if (log.isDebugEnabled()) {
465                t0 = System.currentTimeMillis();
466                log.debug("fetching blob " + digest + " from S3");
467            }
468            try {
469
470                ObjectMetadata metadata = amazonS3.getObject(
471                        new GetObjectRequest(bucketName, bucketNamePrefix + digest), file);
472                // check ETag
473                String etag = metadata.getETag();
474                if (!isEncrypted && !etag.equals(digest) && !ServiceUtils.isMultipartUploadETag(etag)) {
475                    log.error("Invalid ETag in S3, ETag=" + etag + " digest=" + digest);
476                    return false;
477                }
478                return true;
479            } catch (AmazonClientException e) {
480                if (!isMissingKey(e)) {
481                    throw new IOException(e);
482                }
483                return false;
484            } finally {
485                if (log.isDebugEnabled()) {
486                    long dtms = System.currentTimeMillis() - t0;
487                    log.debug("fetched blob " + digest + " from S3 in " + dtms + "ms");
488                }
489            }
490
491        }
492
493        @Override
494        public Long fetchLength(String digest) throws IOException {
495            long t0 = 0;
496            if (log.isDebugEnabled()) {
497                t0 = System.currentTimeMillis();
498                log.debug("fetching blob length " + digest + " from S3");
499            }
500            try {
501                ObjectMetadata metadata = amazonS3.getObjectMetadata(bucketName, bucketNamePrefix + digest);
502                // check ETag
503                String etag = metadata.getETag();
504                if (!isEncrypted && !etag.equals(digest) && !ServiceUtils.isMultipartUploadETag(etag)) {
505                    log.error("Invalid ETag in S3, ETag=" + etag + " digest=" + digest);
506                    return null;
507                }
508                return Long.valueOf(metadata.getContentLength());
509            } catch (AmazonClientException e) {
510                if (!isMissingKey(e)) {
511                    throw new IOException(e);
512                }
513                return null;
514            } finally {
515                if (log.isDebugEnabled()) {
516                    long dtms = System.currentTimeMillis() - t0;
517                    log.debug("fetched blob length " + digest + " from S3 in " + dtms + "ms");
518                }
519            }
520        }
521    }
522
523    /**
524     * Garbage collector for S3 binaries that stores the marked (in use) binaries in memory.
525     */
526    public static class S3BinaryGarbageCollector extends AbstractBinaryGarbageCollector<S3BinaryManager> {
527
528        protected S3BinaryGarbageCollector(S3BinaryManager binaryManager) {
529            super(binaryManager);
530        }
531
532        @Override
533        public String getId() {
534            return "s3:" + binaryManager.bucketName;
535        }
536
537        @Override
538        public Set<String> getUnmarkedBlobs() {
539            // list S3 objects in the bucket
540            // record those not marked
541            Set<String> unmarked = new HashSet<>();
542            ObjectListing list = null;
543            do {
544                if (list == null) {
545                    list = binaryManager.amazonS3.listObjects(binaryManager.bucketName, binaryManager.bucketNamePrefix);
546                } else {
547                    list = binaryManager.amazonS3.listNextBatchOfObjects(list);
548                }
549                for (S3ObjectSummary summary : list.getObjectSummaries()) {
550                    String digest = summary.getKey();
551                    if (!isMD5(digest)) {
552                        // ignore files that cannot be MD5 digests for
553                        // safety
554                        continue;
555                    }
556                    long length = summary.getSize();
557                    if (marked.contains(digest)) {
558                        status.numBinaries++;
559                        status.sizeBinaries += length;
560                    } else {
561                        status.numBinariesGC++;
562                        status.sizeBinariesGC += length;
563                        // record file to delete
564                        unmarked.add(digest);
565                        marked.remove(digest); // optimize memory
566                    }
567                }
568            } while (list.isTruncated());
569
570            return unmarked;
571        }
572    }
573
574    // ******************** BlobProvider ********************
575
576    @Override
577    public Blob readBlob(BlobInfo blobInfo) throws IOException {
578        // just delegate to avoid copy/pasting code
579        return new BinaryBlobProvider(this).readBlob(blobInfo);
580    }
581
582    @Override
583    public String writeBlob(Blob blob, Document doc) throws IOException {
584        // just delegate to avoid copy/pasting code
585        return new BinaryBlobProvider(this).writeBlob(blob, doc);
586    }
587
588    @Override
589    protected boolean isDirectDownload() {
590        return directDownload;
591    }
592
593    @Override
594    protected URI getRemoteUri(String digest, ManagedBlob blob, HttpServletRequest servletRequest) throws IOException {
595        String key = bucketNamePrefix + digest;
596        Date expiration = new Date();
597        expiration.setTime(expiration.getTime() + directDownloadExpire * 1000);
598        GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucketName, key, HttpMethod.GET);
599        request.addRequestParameter("response-content-type", getContentTypeHeader(blob));
600        request.addRequestParameter("response-content-disposition", getContentDispositionHeader(blob, servletRequest));
601        request.setExpiration(expiration);
602        URL url = amazonS3.generatePresignedUrl(request);
603        try {
604            return url.toURI();
605        } catch (URISyntaxException e) {
606            throw new IOException(e);
607        }
608    }
609
610}