001/*
002 * (C) Copyright 2011-2015 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Mathieu Guillaume
016 *     Florent Guillaume
017 */
018package org.nuxeo.ecm.core.storage.sql;
019
020import static org.apache.commons.lang.StringUtils.isBlank;
021import static org.apache.commons.lang.StringUtils.isNotBlank;
022
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.IOException;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.net.URL;
029import java.security.GeneralSecurityException;
030import java.security.KeyPair;
031import java.security.KeyStore;
032import java.security.PrivateKey;
033import java.security.PublicKey;
034import java.security.cert.Certificate;
035import java.util.Collection;
036import java.util.Date;
037import java.util.HashSet;
038import java.util.Set;
039import java.util.regex.Pattern;
040
041import javax.servlet.http.HttpServletRequest;
042
043import org.apache.commons.lang.StringUtils;
044import org.apache.commons.logging.Log;
045import org.apache.commons.logging.LogFactory;
046import org.nuxeo.common.Environment;
047import org.nuxeo.ecm.blob.AbstractBinaryGarbageCollector;
048import org.nuxeo.ecm.blob.AbstractCloudBinaryManager;
049import org.nuxeo.ecm.core.api.Blob;
050import org.nuxeo.ecm.core.blob.BlobManager.BlobInfo;
051import org.nuxeo.ecm.core.blob.ManagedBlob;
052import org.nuxeo.ecm.core.blob.binary.BinaryBlobProvider;
053import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
054import org.nuxeo.ecm.core.blob.binary.FileStorage;
055import org.nuxeo.ecm.core.model.Document;
056import org.nuxeo.runtime.api.Framework;
057
058import com.amazonaws.AmazonClientException;
059import com.amazonaws.AmazonServiceException;
060import com.amazonaws.ClientConfiguration;
061import com.amazonaws.HttpMethod;
062import com.amazonaws.auth.AWSCredentialsProvider;
063import com.amazonaws.auth.InstanceProfileCredentialsProvider;
064import com.amazonaws.services.s3.AmazonS3;
065import com.amazonaws.services.s3.AmazonS3Client;
066import com.amazonaws.services.s3.AmazonS3EncryptionClient;
067import com.amazonaws.services.s3.internal.ServiceUtils;
068import com.amazonaws.services.s3.model.CannedAccessControlList;
069import com.amazonaws.services.s3.model.CryptoConfiguration;
070import com.amazonaws.services.s3.model.EncryptedPutObjectRequest;
071import com.amazonaws.services.s3.model.EncryptionMaterials;
072import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest;
073import com.amazonaws.services.s3.model.GetObjectRequest;
074import com.amazonaws.services.s3.model.ObjectListing;
075import com.amazonaws.services.s3.model.ObjectMetadata;
076import com.amazonaws.services.s3.model.PutObjectRequest;
077import com.amazonaws.services.s3.model.S3ObjectSummary;
078import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider;
079import com.amazonaws.services.s3.transfer.TransferManager;
080import com.amazonaws.services.s3.transfer.Upload;
081import com.amazonaws.services.s3.transfer.model.UploadResult;
082import com.google.common.base.MoreObjects;
083
084/**
085 * A Binary Manager that stores binaries as S3 BLOBs
086 * <p>
087 * The BLOBs are cached locally on first access for efficiency.
088 * <p>
089 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file
090 * if accessed before the stream.
091 */
092public class S3BinaryManager extends AbstractCloudBinaryManager {
093
094    private static final String MD5 = "MD5"; // must be MD5 for Etag
095
096    @Override
097    protected String getDefaultDigestAlgorithm() {
098        return MD5;
099    }
100
101    private static final Log log = LogFactory.getLog(S3BinaryManager.class);
102
103    public static final String PROPERTY_PREFIX = "nuxeo.s3storage";
104
105    public static final String BUCKET_NAME_KEY = "nuxeo.s3storage.bucket";
106
107    public static final String BUCKET_PREFIX_KEY = "nuxeo.s3storage.bucket.prefix";
108
109    public static final String BUCKET_REGION_KEY = "nuxeo.s3storage.region";
110
111    public static final String DEFAULT_BUCKET_REGION = null; // US East
112
113    public static final String AWS_ID_KEY = "nuxeo.s3storage.awsid";
114
115    public static final String AWS_ID_ENV_KEY = "AWS_ACCESS_KEY_ID";
116
117    public static final String AWS_SECRET_KEY = "nuxeo.s3storage.awssecret";
118
119    public static final String AWS_SECRET_ENV_KEY = "AWS_SECRET_ACCESS_KEY";
120
121    public static final String CACHE_SIZE_KEY = "nuxeo.s3storage.cachesize";
122
123    /** AWS ClientConfiguration default 50 */
124    public static final String CONNECTION_MAX_KEY = "nuxeo.s3storage.connection.max";
125
126    /** AWS ClientConfiguration default 3 (with exponential backoff) */
127    public static final String CONNECTION_RETRY_KEY = "nuxeo.s3storage.connection.retry";
128
129    /** AWS ClientConfiguration default 50*1000 = 50s */
130    public static final String CONNECTION_TIMEOUT_KEY = "nuxeo.s3storage.connection.timeout";
131
132    /** AWS ClientConfiguration default 50*1000 = 50s */
133    public static final String SOCKET_TIMEOUT_KEY = "nuxeo.s3storage.socket.timeout";
134
135    public static final String KEYSTORE_FILE_KEY = "nuxeo.s3storage.crypt.keystore.file";
136
137    public static final String KEYSTORE_PASS_KEY = "nuxeo.s3storage.crypt.keystore.password";
138
139    public static final String PRIVKEY_ALIAS_KEY = "nuxeo.s3storage.crypt.key.alias";
140
141    public static final String PRIVKEY_PASS_KEY = "nuxeo.s3storage.crypt.key.password";
142
143    public static final String ENDPOINT_KEY = "nuxeo.s3storage.endpoint";
144
145    public static final String DIRECTDOWNLOAD_KEY = "nuxeo.s3storage.downloadfroms3";
146
147    public static final String DIRECTDOWNLOAD_EXPIRE_KEY = "nuxeo.s3storage.downloadfroms3.expire";
148
149    private static final Pattern MD5_RE = Pattern.compile("(.*/)?[0-9a-f]{32}");
150
151    protected String bucketName;
152
153    protected String bucketNamePrefix;
154
155    protected AWSCredentialsProvider awsCredentialsProvider;
156
157    protected ClientConfiguration clientConfiguration;
158
159    protected EncryptionMaterials encryptionMaterials;
160
161    protected boolean isEncrypted;
162
163    protected CryptoConfiguration cryptoConfiguration;
164
165    protected AmazonS3 amazonS3;
166
167    protected TransferManager transferManager;
168
169    @Override
170    public void close() {
171        // this also shuts down the AmazonS3Client
172        transferManager.shutdownNow();
173        super.close();
174    }
175
176    /**
177     * Aborts uploads that crashed and are older than 1 day.
178     *
179     * @since 7.2
180     */
181    protected void abortOldUploads() throws IOException {
182        int oneDay = 1000 * 60 * 60 * 24;
183        try {
184            transferManager.abortMultipartUploads(bucketName, new Date(System.currentTimeMillis() - oneDay));
185        } catch (AmazonClientException e) {
186            throw new IOException("Failed to abort old uploads", e);
187        }
188    }
189
190    @Override
191    protected void setupCloudClient() throws IOException {
192        // Get settings from the configuration
193        // TODO parse properties too
194        bucketName = Framework.getProperty(BUCKET_NAME_KEY);
195        bucketNamePrefix = MoreObjects.firstNonNull(Framework.getProperty(BUCKET_PREFIX_KEY), StringUtils.EMPTY);
196        String bucketRegion = Framework.getProperty(BUCKET_REGION_KEY);
197        if (isBlank(bucketRegion)) {
198            bucketRegion = DEFAULT_BUCKET_REGION;
199        }
200        String awsID = Framework.getProperty(AWS_ID_KEY);
201        String awsSecret = Framework.getProperty(AWS_SECRET_KEY);
202
203        String proxyHost = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_HOST);
204        String proxyPort = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PORT);
205        String proxyLogin = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_LOGIN);
206        String proxyPassword = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PASSWORD);
207
208        int maxConnections = getIntFrameworkProperty(CONNECTION_MAX_KEY);
209        int maxErrorRetry = getIntFrameworkProperty(CONNECTION_RETRY_KEY);
210        int connectionTimeout = getIntFrameworkProperty(CONNECTION_TIMEOUT_KEY);
211        int socketTimeout = getIntFrameworkProperty(SOCKET_TIMEOUT_KEY);
212
213        String keystoreFile = Framework.getProperty(KEYSTORE_FILE_KEY);
214        String keystorePass = Framework.getProperty(KEYSTORE_PASS_KEY);
215        String privkeyAlias = Framework.getProperty(PRIVKEY_ALIAS_KEY);
216        String privkeyPass = Framework.getProperty(PRIVKEY_PASS_KEY);
217        String endpoint = Framework.getProperty(ENDPOINT_KEY);
218
219        // Fallback on default env keys for ID and secret
220        if (isBlank(awsID)) {
221            awsID = System.getenv(AWS_ID_ENV_KEY);
222        }
223        if (isBlank(awsSecret)) {
224            awsSecret = System.getenv(AWS_SECRET_ENV_KEY);
225        }
226
227        if (isBlank(bucketName)) {
228            throw new RuntimeException("Missing conf: " + BUCKET_NAME_KEY);
229        }
230
231        if (!isBlank(bucketNamePrefix) && !bucketNamePrefix.endsWith("/")) {
232            log.warn(String.format("%s %s S3 bucket prefix should end by '/' " + ": added automatically.",
233                    BUCKET_PREFIX_KEY, bucketNamePrefix));
234            bucketNamePrefix += "/";
235        }
236        // set up credentials
237        if (isBlank(awsID) || isBlank(awsSecret)) {
238            awsCredentialsProvider = new InstanceProfileCredentialsProvider();
239            try {
240                awsCredentialsProvider.getCredentials();
241            } catch (AmazonClientException e) {
242                throw new RuntimeException("Missing AWS credentials and no instance role found");
243            }
244        } else {
245            awsCredentialsProvider = new BasicAWSCredentialsProvider(awsID, awsSecret);
246        }
247
248        // set up client configuration
249        clientConfiguration = new ClientConfiguration();
250        if (isNotBlank(proxyHost)) {
251            clientConfiguration.setProxyHost(proxyHost);
252        }
253        if (isNotBlank(proxyPort)) {
254            clientConfiguration.setProxyPort(Integer.parseInt(proxyPort));
255        }
256        if (isNotBlank(proxyLogin)) {
257            clientConfiguration.setProxyUsername(proxyLogin);
258        }
259        if (proxyPassword != null) { // could be blank
260            clientConfiguration.setProxyPassword(proxyPassword);
261        }
262        if (maxConnections > 0) {
263            clientConfiguration.setMaxConnections(maxConnections);
264        }
265        if (maxErrorRetry >= 0) { // 0 is allowed
266            clientConfiguration.setMaxErrorRetry(maxErrorRetry);
267        }
268        if (connectionTimeout >= 0) { // 0 is allowed
269            clientConfiguration.setConnectionTimeout(connectionTimeout);
270        }
271        if (socketTimeout >= 0) { // 0 is allowed
272            clientConfiguration.setSocketTimeout(socketTimeout);
273        }
274
275        // set up encryption
276        encryptionMaterials = null;
277        if (isNotBlank(keystoreFile)) {
278            boolean confok = true;
279            if (keystorePass == null) { // could be blank
280                log.error("Keystore password missing");
281                confok = false;
282            }
283            if (isBlank(privkeyAlias)) {
284                log.error("Key alias missing");
285                confok = false;
286            }
287            if (privkeyPass == null) { // could be blank
288                log.error("Key password missing");
289                confok = false;
290            }
291            if (!confok) {
292                throw new RuntimeException("S3 Crypto configuration incomplete");
293            }
294            try {
295                // Open keystore
296                File ksFile = new File(keystoreFile);
297                FileInputStream ksStream = new FileInputStream(ksFile);
298                KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType());
299                keystore.load(ksStream, keystorePass.toCharArray());
300                ksStream.close();
301                // Get keypair for alias
302                if (!keystore.isKeyEntry(privkeyAlias)) {
303                    throw new RuntimeException("Alias " + privkeyAlias + " is missing or not a key alias");
304                }
305                PrivateKey privKey = (PrivateKey) keystore.getKey(privkeyAlias, privkeyPass.toCharArray());
306                Certificate cert = keystore.getCertificate(privkeyAlias);
307                PublicKey pubKey = cert.getPublicKey();
308                KeyPair keypair = new KeyPair(pubKey, privKey);
309                // Get encryptionMaterials from keypair
310                encryptionMaterials = new EncryptionMaterials(keypair);
311                cryptoConfiguration = new CryptoConfiguration();
312            } catch (IOException | GeneralSecurityException e) {
313                throw new RuntimeException("Could not read keystore: " + keystoreFile + ", alias: " + privkeyAlias, e);
314            }
315        }
316        isEncrypted = encryptionMaterials != null;
317
318        // Try to create bucket if it doesn't exist
319        if (!isEncrypted) {
320            amazonS3 = new AmazonS3Client(awsCredentialsProvider, clientConfiguration);
321        } else {
322            amazonS3 = new AmazonS3EncryptionClient(awsCredentialsProvider, new StaticEncryptionMaterialsProvider(
323                    encryptionMaterials), clientConfiguration, cryptoConfiguration);
324        }
325        if (isNotBlank(endpoint)) {
326            amazonS3.setEndpoint(endpoint);
327        }
328
329        try {
330            if (!amazonS3.doesBucketExist(bucketName)) {
331                amazonS3.createBucket(bucketName, bucketRegion);
332                amazonS3.setBucketAcl(bucketName, CannedAccessControlList.Private);
333            }
334        } catch (AmazonClientException e) {
335            throw new IOException(e);
336        }
337
338        directDownload = Boolean.parseBoolean(Framework.getProperty(DIRECTDOWNLOAD_KEY, DEFAULT_DIRECTDOWNLOAD));
339        directDownloadExpire = getIntFrameworkProperty(DIRECTDOWNLOAD_EXPIRE_KEY);
340        if (directDownloadExpire < 0) {
341            directDownloadExpire = DEFAULT_DIRECTDOWNLOAD_EXPIRE;
342        }
343
344        transferManager = new TransferManager(amazonS3);
345        abortOldUploads();
346    }
347
348    protected void removeBinary(String digest) {
349        amazonS3.deleteObject(bucketName, digest);
350    }
351
352    @Override
353    protected String getPropertyPrefix() {
354        return PROPERTY_PREFIX;
355    }
356
357    @Override
358    protected BinaryGarbageCollector instantiateGarbageCollector() {
359        return new S3BinaryGarbageCollector(this);
360    }
361
362    @Override
363    public void removeBinaries(Collection<String> digests) {
364        digests.forEach(this::removeBinary);
365    }
366
367    protected static boolean isMissingKey(AmazonClientException e) {
368        if (e instanceof AmazonServiceException) {
369            AmazonServiceException ase = (AmazonServiceException) e;
370            return (ase.getStatusCode() == 404) || "NoSuchKey".equals(ase.getErrorCode())
371                    || "Not Found".equals(e.getMessage());
372        }
373        return false;
374    }
375
376    public static boolean isMD5(String digest) {
377        return MD5_RE.matcher(digest).matches();
378    }
379
380    @Override
381    protected FileStorage getFileStorage() {
382        return new S3FileStorage();
383    }
384
385    public class S3FileStorage implements FileStorage {
386
387        @Override
388        public void storeFile(String digest, File file) throws IOException {
389            long t0 = 0;
390            if (log.isDebugEnabled()) {
391                t0 = System.currentTimeMillis();
392                log.debug("storing blob " + digest + " to S3");
393            }
394            String etag;
395            String key = bucketNamePrefix + digest;
396            try {
397                ObjectMetadata metadata = amazonS3.getObjectMetadata(bucketName, key);
398                etag = metadata.getETag();
399                if (log.isDebugEnabled()) {
400                    log.debug("blob " + digest + " is already in S3");
401                }
402            } catch (AmazonClientException e) {
403                if (!isMissingKey(e)) {
404                    throw new IOException(e);
405                }
406                // not already present -> store the blob
407                PutObjectRequest request;
408                if (!isEncrypted) {
409                    request = new PutObjectRequest(bucketName, key, file);
410                } else {
411                    request = new EncryptedPutObjectRequest(bucketName, key, file);
412                }
413                Upload upload = transferManager.upload(request);
414                try {
415                    UploadResult result = upload.waitForUploadResult();
416                    etag = result.getETag();
417                } catch (AmazonClientException ee) {
418                    throw new IOException(ee);
419                } catch (InterruptedException ee) {
420                    // reset interrupted status
421                    Thread.currentThread().interrupt();
422                    // continue interrupt
423                    throw new RuntimeException(ee);
424                } finally {
425                    if (log.isDebugEnabled()) {
426                        long dtms = System.currentTimeMillis() - t0;
427                        log.debug("stored blob " + digest + " to S3 in " + dtms + "ms");
428                    }
429                }
430            }
431            // check transfer went ok
432            if (!isEncrypted && !etag.equals(digest) && !ServiceUtils.isMultipartUploadETag(etag)) {
433                // When the blob is not encrypted by S3, the MD5 remotely
434                // computed by S3 and passed as a Etag should match the locally
435                // computed MD5 digest.
436                // This check cannot be done when encryption is enabled unless
437                // we could replicate that encryption locally just for that
438                // purpose which would add further load and complexity on the
439                // client.
440                throw new IOException("Invalid ETag in S3, ETag=" + etag + " digest=" + digest);
441            }
442        }
443
444        @Override
445        public boolean fetchFile(String digest, File file) throws IOException {
446            long t0 = 0;
447            if (log.isDebugEnabled()) {
448                t0 = System.currentTimeMillis();
449                log.debug("fetching blob " + digest + " from S3");
450            }
451            try {
452
453                ObjectMetadata metadata = amazonS3.getObject(
454                        new GetObjectRequest(bucketName, bucketNamePrefix + digest), file);
455                // check ETag
456                String etag = metadata.getETag();
457                if (!isEncrypted && !etag.equals(digest) && !ServiceUtils.isMultipartUploadETag(etag)) {
458                    log.error("Invalid ETag in S3, ETag=" + etag + " digest=" + digest);
459                    return false;
460                }
461                return true;
462            } catch (AmazonClientException e) {
463                if (!isMissingKey(e)) {
464                    throw new IOException(e);
465                }
466                return false;
467            } finally {
468                if (log.isDebugEnabled()) {
469                    long dtms = System.currentTimeMillis() - t0;
470                    log.debug("fetched blob " + digest + " from S3 in " + dtms + "ms");
471                }
472            }
473
474        }
475
476        @Override
477        public Long fetchLength(String digest) throws IOException {
478            long t0 = 0;
479            if (log.isDebugEnabled()) {
480                t0 = System.currentTimeMillis();
481                log.debug("fetching blob length " + digest + " from S3");
482            }
483            try {
484                ObjectMetadata metadata = amazonS3.getObjectMetadata(bucketName, bucketNamePrefix + digest);
485                // check ETag
486                String etag = metadata.getETag();
487                if (!isEncrypted && !etag.equals(digest) && !ServiceUtils.isMultipartUploadETag(etag)) {
488                    log.error("Invalid ETag in S3, ETag=" + etag + " digest=" + digest);
489                    return null;
490                }
491                return Long.valueOf(metadata.getContentLength());
492            } catch (AmazonClientException e) {
493                if (!isMissingKey(e)) {
494                    throw new IOException(e);
495                }
496                return null;
497            } finally {
498                if (log.isDebugEnabled()) {
499                    long dtms = System.currentTimeMillis() - t0;
500                    log.debug("fetched blob length " + digest + " from S3 in " + dtms + "ms");
501                }
502            }
503        }
504    }
505
506    /**
507     * Garbage collector for S3 binaries that stores the marked (in use) binaries in memory.
508     */
509    public static class S3BinaryGarbageCollector extends AbstractBinaryGarbageCollector<S3BinaryManager> {
510
511        protected S3BinaryGarbageCollector(S3BinaryManager binaryManager) {
512            super(binaryManager);
513        }
514
515        @Override
516        public String getId() {
517            return "s3:" + binaryManager.bucketName;
518        }
519
520        @Override
521        public Set<String> getUnmarkedBlobs() {
522            // list S3 objects in the bucket
523            // record those not marked
524            Set<String> unmarked = new HashSet<>();
525            ObjectListing list = null;
526            do {
527                if (list == null) {
528                    list = binaryManager.amazonS3.listObjects(binaryManager.bucketName, binaryManager.bucketNamePrefix);
529                } else {
530                    list = binaryManager.amazonS3.listNextBatchOfObjects(list);
531                }
532                for (S3ObjectSummary summary : list.getObjectSummaries()) {
533                    String digest = summary.getKey();
534                    if (!isMD5(digest)) {
535                        // ignore files that cannot be MD5 digests for
536                        // safety
537                        continue;
538                    }
539                    long length = summary.getSize();
540                    if (marked.contains(digest)) {
541                        status.numBinaries++;
542                        status.sizeBinaries += length;
543                    } else {
544                        status.numBinariesGC++;
545                        status.sizeBinariesGC += length;
546                        // record file to delete
547                        unmarked.add(digest);
548                        marked.remove(digest); // optimize memory
549                    }
550                }
551            } while (list.isTruncated());
552
553            return unmarked;
554        }
555    }
556
557    // ******************** BlobProvider ********************
558
559    @Override
560    public Blob readBlob(BlobInfo blobInfo) throws IOException {
561        // just delegate to avoid copy/pasting code
562        return new BinaryBlobProvider(this).readBlob(blobInfo);
563    }
564
565    @Override
566    public String writeBlob(Blob blob, Document doc) throws IOException {
567        // just delegate to avoid copy/pasting code
568        return new BinaryBlobProvider(this).writeBlob(blob, doc);
569    }
570
571    @Override
572    protected boolean isDirectDownload() {
573        return directDownload;
574    }
575
576    @Override
577    protected URI getRemoteUri(String digest, ManagedBlob blob, HttpServletRequest servletRequest) throws IOException {
578        String key = bucketNamePrefix + digest;
579        Date expiration = new Date();
580        expiration.setTime(expiration.getTime() + directDownloadExpire * 1000);
581        GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucketName, key, HttpMethod.GET);
582        request.addRequestParameter("response-content-type", getContentTypeHeader(blob));
583        request.addRequestParameter("response-content-disposition", getContentDispositionHeader(blob, servletRequest));
584        request.setExpiration(expiration);
585        URL url = amazonS3.generatePresignedUrl(request);
586        try {
587            return url.toURI();
588        } catch (URISyntaxException e) {
589            throw new IOException(e);
590        }
591    }
592
593}