001/*
002 * (C) Copyright 2011-2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Mathieu Guillaume
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.storage.sql;
021
022import static org.apache.commons.lang.StringUtils.isBlank;
023import static org.apache.commons.lang.StringUtils.isNotBlank;
024
025import java.io.File;
026import java.io.FileInputStream;
027import java.io.IOException;
028import java.net.URI;
029import java.net.URISyntaxException;
030import java.net.URL;
031import java.security.GeneralSecurityException;
032import java.security.KeyPair;
033import java.security.KeyStore;
034import java.security.PrivateKey;
035import java.security.PublicKey;
036import java.security.cert.Certificate;
037import java.util.ArrayList;
038import java.util.Collection;
039import java.util.Date;
040import java.util.HashSet;
041import java.util.Set;
042import java.util.regex.Pattern;
043
044import javax.servlet.http.HttpServletRequest;
045
046import org.apache.commons.lang.StringUtils;
047import org.apache.commons.logging.Log;
048import org.apache.commons.logging.LogFactory;
049import org.nuxeo.common.Environment;
050import org.nuxeo.ecm.blob.AbstractBinaryGarbageCollector;
051import org.nuxeo.ecm.blob.AbstractCloudBinaryManager;
052import org.nuxeo.ecm.core.api.Blob;
053import org.nuxeo.ecm.core.blob.BlobManager.BlobInfo;
054import org.nuxeo.ecm.core.blob.ManagedBlob;
055import org.nuxeo.ecm.core.blob.binary.BinaryBlobProvider;
056import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
057import org.nuxeo.ecm.core.blob.binary.FileStorage;
058import org.nuxeo.ecm.core.model.Document;
059
060import com.amazonaws.AmazonClientException;
061import com.amazonaws.AmazonServiceException;
062import com.amazonaws.ClientConfiguration;
063import com.amazonaws.HttpMethod;
064import com.amazonaws.auth.AWSCredentialsProvider;
065import com.amazonaws.auth.InstanceProfileCredentialsProvider;
066import com.amazonaws.regions.Region;
067import com.amazonaws.regions.Regions;
068import com.amazonaws.services.s3.AmazonS3;
069import com.amazonaws.services.s3.AmazonS3Client;
070import com.amazonaws.services.s3.AmazonS3EncryptionClient;
071import com.amazonaws.services.s3.internal.ServiceUtils;
072import com.amazonaws.services.s3.model.CannedAccessControlList;
073import com.amazonaws.services.s3.model.CryptoConfiguration;
074import com.amazonaws.services.s3.model.EncryptedPutObjectRequest;
075import com.amazonaws.services.s3.model.EncryptionMaterials;
076import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest;
077import com.amazonaws.services.s3.model.GetObjectRequest;
078import com.amazonaws.services.s3.model.ObjectListing;
079import com.amazonaws.services.s3.model.ObjectMetadata;
080import com.amazonaws.services.s3.model.PutObjectRequest;
081import com.amazonaws.services.s3.model.S3ObjectSummary;
082import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider;
083import com.amazonaws.services.s3.transfer.TransferManager;
084import com.amazonaws.services.s3.transfer.Upload;
085import com.amazonaws.services.s3.transfer.model.UploadResult;
086import com.google.common.base.MoreObjects;
087
088/**
089 * A Binary Manager that stores binaries as S3 BLOBs
090 * <p>
091 * The BLOBs are cached locally on first access for efficiency.
092 * <p>
093 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file
094 * if accessed before the stream.
095 */
096public class S3BinaryManager extends AbstractCloudBinaryManager {
097
098    private static final String MD5 = "MD5"; // must be MD5 for Etag
099
100    @Override
101    protected String getDefaultDigestAlgorithm() {
102        return MD5;
103    }
104
105    private static final Log log = LogFactory.getLog(S3BinaryManager.class);
106
107    public static final String SYSTEM_PROPERTY_PREFIX = "nuxeo.s3storage";
108
109    public static final String BUCKET_NAME_PROPERTY = "bucket";
110
111    public static final String BUCKET_PREFIX_PROPERTY = "bucket_prefix";
112
113    public static final String BUCKET_REGION_PROPERTY = "region";
114
115    public static final String DEFAULT_BUCKET_REGION = null; // US East
116
117    public static final String AWS_ID_PROPERTY = "awsid";
118
119    public static final String AWS_ID_ENV = "AWS_ACCESS_KEY_ID";
120
121    public static final String AWS_SECRET_PROPERTY = "awssecret";
122
123    public static final String AWS_SECRET_ENV = "AWS_SECRET_ACCESS_KEY";
124
125    /** AWS ClientConfiguration default 50 */
126    public static final String CONNECTION_MAX_PROPERTY = "connection.max";
127
128    /** AWS ClientConfiguration default 3 (with exponential backoff) */
129    public static final String CONNECTION_RETRY_PROPERTY = "connection.retry";
130
131    /** AWS ClientConfiguration default 50*1000 = 50s */
132    public static final String CONNECTION_TIMEOUT_PROPERTY = "connection.timeout";
133
134    /** AWS ClientConfiguration default 50*1000 = 50s */
135    public static final String SOCKET_TIMEOUT_PROPERTY = "socket.timeout";
136
137    public static final String KEYSTORE_FILE_PROPERTY = "crypt.keystore.file";
138
139    public static final String KEYSTORE_PASS_PROPERTY = "crypt.keystore.password";
140
141    public static final String SERVERSIDE_ENCRYPTION_PROPERTY = "crypt.serverside";
142
143    public static final String PRIVKEY_ALIAS_PROPERTY = "crypt.key.alias";
144
145    public static final String PRIVKEY_PASS_PROPERTY = "crypt.key.password";
146
147    public static final String ENDPOINT_PROPERTY = "endpoint";
148
149    public static final String DIRECTDOWNLOAD_PROPERTY_COMPAT = "downloadfroms3";
150
151    public static final String DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT = "downloadfroms3.expire";
152
153    private static final Pattern MD5_RE = Pattern.compile("[0-9a-f]{32}");
154
155    protected String bucketName;
156
157    protected String bucketNamePrefix;
158
159    protected AWSCredentialsProvider awsCredentialsProvider;
160
161    protected ClientConfiguration clientConfiguration;
162
163    protected EncryptionMaterials encryptionMaterials;
164
165    protected boolean isEncrypted;
166
167    protected CryptoConfiguration cryptoConfiguration;
168
169    protected boolean userServerSideEncryption;
170
171    protected AmazonS3 amazonS3;
172
173    protected TransferManager transferManager;
174
175    @Override
176    public void close() {
177        // this also shuts down the AmazonS3Client
178        transferManager.shutdownNow();
179        super.close();
180    }
181
182    /**
183     * Aborts uploads that crashed and are older than 1 day.
184     *
185     * @since 7.2
186     */
187    protected void abortOldUploads() throws IOException {
188        int oneDay = 1000 * 60 * 60 * 24;
189        try {
190            transferManager.abortMultipartUploads(bucketName, new Date(System.currentTimeMillis() - oneDay));
191        } catch (AmazonClientException e) {
192            throw new IOException("Failed to abort old uploads", e);
193        }
194    }
195
196    @Override
197    protected void setupCloudClient() throws IOException {
198        // Get settings from the configuration
199        bucketName = getProperty(BUCKET_NAME_PROPERTY);
200        bucketNamePrefix = MoreObjects.firstNonNull(getProperty(BUCKET_PREFIX_PROPERTY), StringUtils.EMPTY);
201        String bucketRegion = getProperty(BUCKET_REGION_PROPERTY);
202        if (isBlank(bucketRegion)) {
203            bucketRegion = DEFAULT_BUCKET_REGION;
204        }
205        String awsID = getProperty(AWS_ID_PROPERTY);
206        String awsSecret = getProperty(AWS_SECRET_PROPERTY);
207
208        String proxyHost = getProperty(Environment.NUXEO_HTTP_PROXY_HOST);
209        String proxyPort = getProperty(Environment.NUXEO_HTTP_PROXY_PORT);
210        String proxyLogin = getProperty(Environment.NUXEO_HTTP_PROXY_LOGIN);
211        String proxyPassword = getProperty(Environment.NUXEO_HTTP_PROXY_PASSWORD);
212
213        int maxConnections = getIntProperty(CONNECTION_MAX_PROPERTY);
214        int maxErrorRetry = getIntProperty(CONNECTION_RETRY_PROPERTY);
215        int connectionTimeout = getIntProperty(CONNECTION_TIMEOUT_PROPERTY);
216        int socketTimeout = getIntProperty(SOCKET_TIMEOUT_PROPERTY);
217
218        String keystoreFile = getProperty(KEYSTORE_FILE_PROPERTY);
219        String keystorePass = getProperty(KEYSTORE_PASS_PROPERTY);
220        String privkeyAlias = getProperty(PRIVKEY_ALIAS_PROPERTY);
221        String privkeyPass = getProperty(PRIVKEY_PASS_PROPERTY);
222        String endpoint = getProperty(ENDPOINT_PROPERTY);
223        String sseprop = getProperty(SERVERSIDE_ENCRYPTION_PROPERTY);
224        if (isNotBlank(sseprop)) {
225            userServerSideEncryption = Boolean.parseBoolean(sseprop);
226        }
227
228        // Fallback on default env keys for ID and secret
229        if (isBlank(awsID)) {
230            awsID = System.getenv(AWS_ID_ENV);
231        }
232        if (isBlank(awsSecret)) {
233            awsSecret = System.getenv(AWS_SECRET_ENV);
234        }
235
236        if (isBlank(bucketName)) {
237            throw new RuntimeException("Missing conf: " + BUCKET_NAME_PROPERTY);
238        }
239
240        if (!isBlank(bucketNamePrefix) && !bucketNamePrefix.endsWith("/")) {
241            log.warn(String.format("%s %s S3 bucket prefix should end by '/' " + ": added automatically.",
242                    BUCKET_PREFIX_PROPERTY, bucketNamePrefix));
243            bucketNamePrefix += "/";
244        }
245        // set up credentials
246        if (isBlank(awsID) || isBlank(awsSecret)) {
247            awsCredentialsProvider = new InstanceProfileCredentialsProvider();
248            try {
249                awsCredentialsProvider.getCredentials();
250            } catch (AmazonClientException e) {
251                throw new RuntimeException("Missing AWS credentials and no instance role found");
252            }
253        } else {
254            awsCredentialsProvider = new BasicAWSCredentialsProvider(awsID, awsSecret);
255        }
256
257        // set up client configuration
258        clientConfiguration = new ClientConfiguration();
259        if (isNotBlank(proxyHost)) {
260            clientConfiguration.setProxyHost(proxyHost);
261        }
262        if (isNotBlank(proxyPort)) {
263            clientConfiguration.setProxyPort(Integer.parseInt(proxyPort));
264        }
265        if (isNotBlank(proxyLogin)) {
266            clientConfiguration.setProxyUsername(proxyLogin);
267        }
268        if (proxyPassword != null) { // could be blank
269            clientConfiguration.setProxyPassword(proxyPassword);
270        }
271        if (maxConnections > 0) {
272            clientConfiguration.setMaxConnections(maxConnections);
273        }
274        if (maxErrorRetry >= 0) { // 0 is allowed
275            clientConfiguration.setMaxErrorRetry(maxErrorRetry);
276        }
277        if (connectionTimeout >= 0) { // 0 is allowed
278            clientConfiguration.setConnectionTimeout(connectionTimeout);
279        }
280        if (socketTimeout >= 0) { // 0 is allowed
281            clientConfiguration.setSocketTimeout(socketTimeout);
282        }
283
284        // set up encryption
285        encryptionMaterials = null;
286        if (isNotBlank(keystoreFile)) {
287            boolean confok = true;
288            if (keystorePass == null) { // could be blank
289                log.error("Keystore password missing");
290                confok = false;
291            }
292            if (isBlank(privkeyAlias)) {
293                log.error("Key alias missing");
294                confok = false;
295            }
296            if (privkeyPass == null) { // could be blank
297                log.error("Key password missing");
298                confok = false;
299            }
300            if (!confok) {
301                throw new RuntimeException("S3 Crypto configuration incomplete");
302            }
303            try {
304                // Open keystore
305                File ksFile = new File(keystoreFile);
306                FileInputStream ksStream = new FileInputStream(ksFile);
307                KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType());
308                keystore.load(ksStream, keystorePass.toCharArray());
309                ksStream.close();
310                // Get keypair for alias
311                if (!keystore.isKeyEntry(privkeyAlias)) {
312                    throw new RuntimeException("Alias " + privkeyAlias + " is missing or not a key alias");
313                }
314                PrivateKey privKey = (PrivateKey) keystore.getKey(privkeyAlias, privkeyPass.toCharArray());
315                Certificate cert = keystore.getCertificate(privkeyAlias);
316                PublicKey pubKey = cert.getPublicKey();
317                KeyPair keypair = new KeyPair(pubKey, privKey);
318                // Get encryptionMaterials from keypair
319                encryptionMaterials = new EncryptionMaterials(keypair);
320                cryptoConfiguration = new CryptoConfiguration();
321            } catch (IOException | GeneralSecurityException e) {
322                throw new RuntimeException("Could not read keystore: " + keystoreFile + ", alias: " + privkeyAlias, e);
323            }
324        }
325        isEncrypted = encryptionMaterials != null;
326
327        // Try to create bucket if it doesn't exist
328        if (!isEncrypted) {
329            amazonS3 = new AmazonS3Client(awsCredentialsProvider, clientConfiguration);
330        } else {
331            amazonS3 = new AmazonS3EncryptionClient(awsCredentialsProvider, new StaticEncryptionMaterialsProvider(
332                    encryptionMaterials), clientConfiguration, cryptoConfiguration);
333        }
334        if (isNotBlank(endpoint)) {
335            amazonS3.setEndpoint(endpoint);
336        }
337
338        // Set region explicitely for regions that reguire Version 4 signature
339        ArrayList<String> V4_ONLY_REGIONS = new ArrayList<String>();
340        V4_ONLY_REGIONS.add("eu-central-1");
341        V4_ONLY_REGIONS.add("ap-northeast-2");
342        if (V4_ONLY_REGIONS.contains(bucketRegion)) {
343            amazonS3.setRegion(Region.getRegion(Regions.fromName(bucketRegion)));
344        }
345
346        try {
347            if (!amazonS3.doesBucketExist(bucketName)) {
348                amazonS3.createBucket(bucketName, bucketRegion);
349                amazonS3.setBucketAcl(bucketName, CannedAccessControlList.Private);
350            }
351        } catch (AmazonClientException e) {
352            throw new IOException(e);
353        }
354
355        // compat for NXP-17895, using "downloadfroms3", to be removed
356        // these two fields have already been initialized by the base class initialize()
357        // using standard property "directdownload"
358        String dd = getProperty(DIRECTDOWNLOAD_PROPERTY_COMPAT);
359        if (dd != null) {
360            directDownload = Boolean.parseBoolean(dd);
361        }
362        int dde = getIntProperty(DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT);
363        if (dde >= 0) {
364            directDownloadExpire = dde;
365        }
366
367        transferManager = new TransferManager(amazonS3);
368        abortOldUploads();
369    }
370
371    protected void removeBinary(String digest) {
372        amazonS3.deleteObject(bucketName, bucketNamePrefix + digest);
373    }
374
375    @Override
376    protected String getSystemPropertyPrefix() {
377        return SYSTEM_PROPERTY_PREFIX;
378    }
379
380    @Override
381    protected BinaryGarbageCollector instantiateGarbageCollector() {
382        return new S3BinaryGarbageCollector(this);
383    }
384
385    @Override
386    public void removeBinaries(Collection<String> digests) {
387        digests.forEach(this::removeBinary);
388    }
389
390    protected static boolean isMissingKey(AmazonClientException e) {
391        if (e instanceof AmazonServiceException) {
392            AmazonServiceException ase = (AmazonServiceException) e;
393            return (ase.getStatusCode() == 404) || "NoSuchKey".equals(ase.getErrorCode())
394                    || "Not Found".equals(e.getMessage());
395        }
396        return false;
397    }
398
399    public static boolean isMD5(String digest) {
400        return MD5_RE.matcher(digest).matches();
401    }
402
403    @Override
404    protected FileStorage getFileStorage() {
405        return new S3FileStorage();
406    }
407
408    public class S3FileStorage implements FileStorage {
409
410        @Override
411        public void storeFile(String digest, File file) throws IOException {
412            long t0 = 0;
413            if (log.isDebugEnabled()) {
414                t0 = System.currentTimeMillis();
415                log.debug("storing blob " + digest + " to S3");
416            }
417            String etag;
418            String key = bucketNamePrefix + digest;
419            try {
420                ObjectMetadata metadata = amazonS3.getObjectMetadata(bucketName, key);
421                etag = metadata.getETag();
422                if (log.isDebugEnabled()) {
423                    log.debug("blob " + digest + " is already in S3");
424                }
425            } catch (AmazonClientException e) {
426                if (!isMissingKey(e)) {
427                    throw new IOException(e);
428                }
429                // not already present -> store the blob
430                PutObjectRequest request;
431                if (!isEncrypted) {
432                    request = new PutObjectRequest(bucketName, key, file);
433                    if (userServerSideEncryption) {
434                        ObjectMetadata objectMetadata = new ObjectMetadata();
435                        objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
436                        request.setMetadata(objectMetadata);
437                    }
438                } else {
439                    request = new EncryptedPutObjectRequest(bucketName, key, file);
440                }
441                Upload upload = transferManager.upload(request);
442                try {
443                    UploadResult result = upload.waitForUploadResult();
444                    etag = result.getETag();
445                } catch (AmazonClientException ee) {
446                    throw new IOException(ee);
447                } catch (InterruptedException ee) {
448                    // reset interrupted status
449                    Thread.currentThread().interrupt();
450                    // continue interrupt
451                    throw new RuntimeException(ee);
452                } finally {
453                    if (log.isDebugEnabled()) {
454                        long dtms = System.currentTimeMillis() - t0;
455                        log.debug("stored blob " + digest + " to S3 in " + dtms + "ms");
456                    }
457                }
458            }
459            // check transfer went ok
460            if (!isEncrypted && !etag.equals(digest) && !ServiceUtils.isMultipartUploadETag(etag)) {
461                // When the blob is not encrypted by S3, the MD5 remotely
462                // computed by S3 and passed as a Etag should match the locally
463                // computed MD5 digest.
464                // This check cannot be done when encryption is enabled unless
465                // we could replicate that encryption locally just for that
466                // purpose which would add further load and complexity on the
467                // client.
468                throw new IOException("Invalid ETag in S3, ETag=" + etag + " digest=" + digest);
469            }
470        }
471
472        @Override
473        public boolean fetchFile(String digest, File file) throws IOException {
474            long t0 = 0;
475            if (log.isDebugEnabled()) {
476                t0 = System.currentTimeMillis();
477                log.debug("fetching blob " + digest + " from S3");
478            }
479            try {
480
481                ObjectMetadata metadata = amazonS3.getObject(
482                        new GetObjectRequest(bucketName, bucketNamePrefix + digest), file);
483                // check ETag
484                String etag = metadata.getETag();
485                if (!isEncrypted && !etag.equals(digest) && !ServiceUtils.isMultipartUploadETag(etag)) {
486                    log.error("Invalid ETag in S3, ETag=" + etag + " digest=" + digest);
487                    return false;
488                }
489                return true;
490            } catch (AmazonClientException e) {
491                if (!isMissingKey(e)) {
492                    throw new IOException(e);
493                }
494                return false;
495            } finally {
496                if (log.isDebugEnabled()) {
497                    long dtms = System.currentTimeMillis() - t0;
498                    log.debug("fetched blob " + digest + " from S3 in " + dtms + "ms");
499                }
500            }
501
502        }
503    }
504
505    /**
506     * Garbage collector for S3 binaries that stores the marked (in use) binaries in memory.
507     */
508    public static class S3BinaryGarbageCollector extends AbstractBinaryGarbageCollector<S3BinaryManager> {
509
510        protected S3BinaryGarbageCollector(S3BinaryManager binaryManager) {
511            super(binaryManager);
512        }
513
514        @Override
515        public String getId() {
516            return "s3:" + binaryManager.bucketName;
517        }
518
519        @Override
520        public Set<String> getUnmarkedBlobs() {
521            // list S3 objects in the bucket
522            // record those not marked
523            Set<String> unmarked = new HashSet<>();
524            ObjectListing list = null;
525            do {
526                if (list == null) {
527                    list = binaryManager.amazonS3.listObjects(binaryManager.bucketName, binaryManager.bucketNamePrefix);
528                } else {
529                    list = binaryManager.amazonS3.listNextBatchOfObjects(list);
530                }
531                int prefixLength = binaryManager.bucketNamePrefix.length();
532                for (S3ObjectSummary summary : list.getObjectSummaries()) {
533                    String digest = summary.getKey().substring(prefixLength);
534                    if (!isMD5(digest)) {
535                        // ignore files that cannot be MD5 digests for
536                        // safety
537                        continue;
538                    }
539                    long length = summary.getSize();
540                    if (marked.contains(digest)) {
541                        status.numBinaries++;
542                        status.sizeBinaries += length;
543                    } else {
544                        status.numBinariesGC++;
545                        status.sizeBinariesGC += length;
546                        // record file to delete
547                        unmarked.add(digest);
548                        marked.remove(digest); // optimize memory
549                    }
550                }
551            } while (list.isTruncated());
552
553            return unmarked;
554        }
555    }
556
557    // ******************** BlobProvider ********************
558
559    @Override
560    public Blob readBlob(BlobInfo blobInfo) throws IOException {
561        // just delegate to avoid copy/pasting code
562        return new BinaryBlobProvider(this).readBlob(blobInfo);
563    }
564
565    @Override
566    public String writeBlob(Blob blob, Document doc) throws IOException {
567        // just delegate to avoid copy/pasting code
568        return new BinaryBlobProvider(this).writeBlob(blob, doc);
569    }
570
571    @Override
572    protected boolean isDirectDownload() {
573        return directDownload;
574    }
575
576    @Override
577    protected URI getRemoteUri(String digest, ManagedBlob blob, HttpServletRequest servletRequest) throws IOException {
578        String key = bucketNamePrefix + digest;
579        Date expiration = new Date();
580        expiration.setTime(expiration.getTime() + directDownloadExpire * 1000);
581        GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucketName, key, HttpMethod.GET);
582        request.addRequestParameter("response-content-type", getContentTypeHeader(blob));
583        request.addRequestParameter("response-content-disposition", getContentDispositionHeader(blob, null));
584        request.setExpiration(expiration);
585        URL url = amazonS3.generatePresignedUrl(request);
586        try {
587            return url.toURI();
588        } catch (URISyntaxException e) {
589            throw new IOException(e);
590        }
591    }
592
593}