001/*
002 * (C) Copyright 2011-2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Mathieu Guillaume
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.storage.sql;
021
022import static org.apache.commons.lang.StringUtils.isBlank;
023import static org.apache.commons.lang.StringUtils.isNotBlank;
024
025import java.io.File;
026import java.io.FileInputStream;
027import java.io.IOException;
028import java.net.URI;
029import java.net.URISyntaxException;
030import java.net.URL;
031import java.security.GeneralSecurityException;
032import java.security.KeyPair;
033import java.security.KeyStore;
034import java.security.PrivateKey;
035import java.security.PublicKey;
036import java.security.cert.Certificate;
037import java.util.Collection;
038import java.util.Date;
039import java.util.HashSet;
040import java.util.Set;
041import java.util.regex.Pattern;
042
043import org.apache.commons.codec.digest.DigestUtils;
044
045import javax.servlet.http.HttpServletRequest;
046
047import org.apache.commons.lang.StringUtils;
048import org.apache.commons.logging.Log;
049import org.apache.commons.logging.LogFactory;
050import org.nuxeo.common.Environment;
051import org.nuxeo.ecm.blob.AbstractBinaryGarbageCollector;
052import org.nuxeo.ecm.blob.AbstractCloudBinaryManager;
053import org.nuxeo.ecm.core.blob.ManagedBlob;
054import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
055import org.nuxeo.ecm.core.blob.binary.FileStorage;
056import org.nuxeo.runtime.api.Framework;
057
058import com.amazonaws.AmazonClientException;
059import com.amazonaws.AmazonServiceException;
060import com.amazonaws.ClientConfiguration;
061import com.amazonaws.HttpMethod;
062import com.amazonaws.auth.AWSCredentialsProvider;
063import com.amazonaws.auth.InstanceProfileCredentialsProvider;
064import com.amazonaws.services.s3.AmazonS3;
065import com.amazonaws.services.s3.AmazonS3ClientBuilder;
066import com.amazonaws.services.s3.AmazonS3EncryptionClientBuilder;
067import com.amazonaws.services.s3.model.CannedAccessControlList;
068import com.amazonaws.services.s3.model.CryptoConfiguration;
069import com.amazonaws.services.s3.model.EncryptedPutObjectRequest;
070import com.amazonaws.services.s3.model.EncryptionMaterials;
071import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest;
072import com.amazonaws.services.s3.model.GetObjectRequest;
073import com.amazonaws.services.s3.model.ObjectListing;
074import com.amazonaws.services.s3.model.ObjectMetadata;
075import com.amazonaws.services.s3.model.PutObjectRequest;
076import com.amazonaws.services.s3.model.S3ObjectSummary;
077import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider;
078import com.amazonaws.services.s3.transfer.Download;
079import com.amazonaws.services.s3.transfer.TransferManager;
080import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
081import com.amazonaws.services.s3.transfer.Upload;
082import com.google.common.base.MoreObjects;
083
084/**
085 * A Binary Manager that stores binaries as S3 BLOBs
086 * <p>
087 * The BLOBs are cached locally on first access for efficiency.
088 * <p>
089 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file
090 * if accessed before the stream.
091 */
092public class S3BinaryManager extends AbstractCloudBinaryManager {
093
094    private static final String MD5 = "MD5"; // must be MD5 for Etag
095
096    @Override
097    protected String getDefaultDigestAlgorithm() {
098        return MD5;
099    }
100
101    private static final Log log = LogFactory.getLog(S3BinaryManager.class);
102
103    public static final String SYSTEM_PROPERTY_PREFIX = "nuxeo.s3storage";
104
105    public static final String BUCKET_NAME_PROPERTY = "bucket";
106
107    public static final String BUCKET_PREFIX_PROPERTY = "bucket_prefix";
108
109    public static final String BUCKET_REGION_PROPERTY = "region";
110
111    public static final String DEFAULT_BUCKET_REGION = "us-east-1"; // US East
112
113    public static final String AWS_ID_PROPERTY = "awsid";
114
115    public static final String AWS_ID_ENV = "AWS_ACCESS_KEY_ID";
116
117    public static final String AWS_SECRET_PROPERTY = "awssecret";
118
119    public static final String AWS_SECRET_ENV = "AWS_SECRET_ACCESS_KEY";
120
121    /** AWS ClientConfiguration default 50 */
122    public static final String CONNECTION_MAX_PROPERTY = "connection.max";
123
124    /** AWS ClientConfiguration default 3 (with exponential backoff) */
125    public static final String CONNECTION_RETRY_PROPERTY = "connection.retry";
126
127    /** AWS ClientConfiguration default 50*1000 = 50s */
128    public static final String CONNECTION_TIMEOUT_PROPERTY = "connection.timeout";
129
130    /** AWS ClientConfiguration default 50*1000 = 50s */
131    public static final String SOCKET_TIMEOUT_PROPERTY = "socket.timeout";
132
133    public static final String KEYSTORE_FILE_PROPERTY = "crypt.keystore.file";
134
135    public static final String KEYSTORE_PASS_PROPERTY = "crypt.keystore.password";
136
137    public static final String SERVERSIDE_ENCRYPTION_PROPERTY = "crypt.serverside";
138
139    public static final String PRIVKEY_ALIAS_PROPERTY = "crypt.key.alias";
140
141    public static final String PRIVKEY_PASS_PROPERTY = "crypt.key.password";
142
143    public static final String ENDPOINT_PROPERTY = "endpoint";
144
145    public static final String DIRECTDOWNLOAD_PROPERTY_COMPAT = "downloadfroms3";
146
147    public static final String DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT = "downloadfroms3.expire";
148
149    private static final Pattern MD5_RE = Pattern.compile("[0-9a-f]{32}");
150
151    protected String bucketName;
152
153    protected String bucketNamePrefix;
154
155    protected AWSCredentialsProvider awsCredentialsProvider;
156
157    protected ClientConfiguration clientConfiguration;
158
159    protected EncryptionMaterials encryptionMaterials;
160
161    protected boolean isEncrypted;
162
163    protected CryptoConfiguration cryptoConfiguration;
164
165    protected boolean userServerSideEncryption;
166
167    protected AmazonS3 amazonS3;
168
169    protected TransferManager transferManager;
170
171    @Override
172    public void close() {
173        // this also shuts down the AmazonS3Client
174        transferManager.shutdownNow();
175        super.close();
176    }
177
178    /**
179     * Aborts uploads that crashed and are older than 1 day.
180     *
181     * @since 7.2
182     */
183    protected void abortOldUploads() throws IOException {
184        int oneDay = 1000 * 60 * 60 * 24;
185        try {
186            transferManager.abortMultipartUploads(bucketName, new Date(System.currentTimeMillis() - oneDay));
187        } catch (AmazonClientException e) {
188            throw new IOException("Failed to abort old uploads", e);
189        }
190    }
191
192    @Override
193    protected void setupCloudClient() throws IOException {
194        // Get settings from the configuration
195        bucketName = getProperty(BUCKET_NAME_PROPERTY);
196        bucketNamePrefix = MoreObjects.firstNonNull(getProperty(BUCKET_PREFIX_PROPERTY), StringUtils.EMPTY);
197        String bucketRegion = getProperty(BUCKET_REGION_PROPERTY);
198        if (isBlank(bucketRegion)) {
199            bucketRegion = DEFAULT_BUCKET_REGION;
200        }
201        String awsID = getProperty(AWS_ID_PROPERTY);
202        String awsSecret = getProperty(AWS_SECRET_PROPERTY);
203
204        String proxyHost = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_HOST);
205        String proxyPort = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PORT);
206        String proxyLogin = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_LOGIN);
207        String proxyPassword = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PASSWORD);
208
209        int maxConnections = getIntProperty(CONNECTION_MAX_PROPERTY);
210        int maxErrorRetry = getIntProperty(CONNECTION_RETRY_PROPERTY);
211        int connectionTimeout = getIntProperty(CONNECTION_TIMEOUT_PROPERTY);
212        int socketTimeout = getIntProperty(SOCKET_TIMEOUT_PROPERTY);
213
214        String keystoreFile = getProperty(KEYSTORE_FILE_PROPERTY);
215        String keystorePass = getProperty(KEYSTORE_PASS_PROPERTY);
216        String privkeyAlias = getProperty(PRIVKEY_ALIAS_PROPERTY);
217        String privkeyPass = getProperty(PRIVKEY_PASS_PROPERTY);
218        String endpoint = getProperty(ENDPOINT_PROPERTY);
219        String sseprop = getProperty(SERVERSIDE_ENCRYPTION_PROPERTY);
220        if (isNotBlank(sseprop)) {
221            userServerSideEncryption = Boolean.parseBoolean(sseprop);
222        }
223
224        // Fallback on default env keys for ID and secret
225        if (isBlank(awsID)) {
226            awsID = System.getenv(AWS_ID_ENV);
227        }
228        if (isBlank(awsSecret)) {
229            awsSecret = System.getenv(AWS_SECRET_ENV);
230        }
231
232        if (isBlank(bucketName)) {
233            throw new RuntimeException("Missing conf: " + BUCKET_NAME_PROPERTY);
234        }
235
236        if (!isBlank(bucketNamePrefix) && !bucketNamePrefix.endsWith("/")) {
237            log.warn(String.format("%s %s S3 bucket prefix should end by '/' " + ": added automatically.",
238                    BUCKET_PREFIX_PROPERTY, bucketNamePrefix));
239            bucketNamePrefix += "/";
240        }
241        // set up credentials
242        if (isBlank(awsID) || isBlank(awsSecret)) {
243            awsCredentialsProvider = InstanceProfileCredentialsProvider.getInstance();
244            try {
245                awsCredentialsProvider.getCredentials();
246            } catch (AmazonClientException e) {
247                throw new RuntimeException("Missing AWS credentials and no instance role found");
248            }
249        } else {
250            awsCredentialsProvider = new BasicAWSCredentialsProvider(awsID, awsSecret);
251        }
252
253        // set up client configuration
254        clientConfiguration = new ClientConfiguration();
255        if (isNotBlank(proxyHost)) {
256            clientConfiguration.setProxyHost(proxyHost);
257        }
258        if (isNotBlank(proxyPort)) {
259            clientConfiguration.setProxyPort(Integer.parseInt(proxyPort));
260        }
261        if (isNotBlank(proxyLogin)) {
262            clientConfiguration.setProxyUsername(proxyLogin);
263        }
264        if (proxyPassword != null) { // could be blank
265            clientConfiguration.setProxyPassword(proxyPassword);
266        }
267        if (maxConnections > 0) {
268            clientConfiguration.setMaxConnections(maxConnections);
269        }
270        if (maxErrorRetry >= 0) { // 0 is allowed
271            clientConfiguration.setMaxErrorRetry(maxErrorRetry);
272        }
273        if (connectionTimeout >= 0) { // 0 is allowed
274            clientConfiguration.setConnectionTimeout(connectionTimeout);
275        }
276        if (socketTimeout >= 0) { // 0 is allowed
277            clientConfiguration.setSocketTimeout(socketTimeout);
278        }
279
280        // set up encryption
281        encryptionMaterials = null;
282        if (isNotBlank(keystoreFile)) {
283            boolean confok = true;
284            if (keystorePass == null) { // could be blank
285                log.error("Keystore password missing");
286                confok = false;
287            }
288            if (isBlank(privkeyAlias)) {
289                log.error("Key alias missing");
290                confok = false;
291            }
292            if (privkeyPass == null) { // could be blank
293                log.error("Key password missing");
294                confok = false;
295            }
296            if (!confok) {
297                throw new RuntimeException("S3 Crypto configuration incomplete");
298            }
299            try {
300                // Open keystore
301                File ksFile = new File(keystoreFile);
302                FileInputStream ksStream = new FileInputStream(ksFile);
303                KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType());
304                keystore.load(ksStream, keystorePass.toCharArray());
305                ksStream.close();
306                // Get keypair for alias
307                if (!keystore.isKeyEntry(privkeyAlias)) {
308                    throw new RuntimeException("Alias " + privkeyAlias + " is missing or not a key alias");
309                }
310                PrivateKey privKey = (PrivateKey) keystore.getKey(privkeyAlias, privkeyPass.toCharArray());
311                Certificate cert = keystore.getCertificate(privkeyAlias);
312                PublicKey pubKey = cert.getPublicKey();
313                KeyPair keypair = new KeyPair(pubKey, privKey);
314                // Get encryptionMaterials from keypair
315                encryptionMaterials = new EncryptionMaterials(keypair);
316                cryptoConfiguration = new CryptoConfiguration();
317            } catch (IOException | GeneralSecurityException e) {
318                throw new RuntimeException("Could not read keystore: " + keystoreFile + ", alias: " + privkeyAlias, e);
319            }
320        }
321        isEncrypted = encryptionMaterials != null;
322
323        // Try to create bucket if it doesn't exist
324        if (!isEncrypted) {
325            amazonS3 = AmazonS3ClientBuilder.standard()
326                            .withCredentials(awsCredentialsProvider)
327                            .withClientConfiguration(clientConfiguration)
328                            .withRegion(bucketRegion)
329                            .build();
330        } else {
331            amazonS3 = AmazonS3EncryptionClientBuilder.standard()
332                            .withClientConfiguration(clientConfiguration)
333                            .withCryptoConfiguration(cryptoConfiguration)
334                            .withCredentials(awsCredentialsProvider)
335                            .withRegion(bucketRegion)
336                            .withEncryptionMaterials(new StaticEncryptionMaterialsProvider(encryptionMaterials))
337                            .build();
338        }
339        if (isNotBlank(endpoint)) {
340            amazonS3.setEndpoint(endpoint);
341        }
342
343        try {
344            if (!amazonS3.doesBucketExist(bucketName)) {
345                amazonS3.createBucket(bucketName);
346                amazonS3.setBucketAcl(bucketName, CannedAccessControlList.Private);
347            }
348        } catch (AmazonClientException e) {
349            throw new IOException(e);
350        }
351
352        // compat for NXP-17895, using "downloadfroms3", to be removed
353        // these two fields have already been initialized by the base class initialize()
354        // using standard property "directdownload"
355        String dd = getProperty(DIRECTDOWNLOAD_PROPERTY_COMPAT);
356        if (dd != null) {
357            directDownload = Boolean.parseBoolean(dd);
358        }
359        int dde = getIntProperty(DIRECTDOWNLOAD_EXPIRE_PROPERTY_COMPAT);
360        if (dde >= 0) {
361            directDownloadExpire = dde;
362        }
363
364        transferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build();
365        abortOldUploads();
366    }
367
368    protected void removeBinary(String digest) {
369        amazonS3.deleteObject(bucketName, bucketNamePrefix + digest);
370    }
371
372    @Override
373    protected String getSystemPropertyPrefix() {
374        return SYSTEM_PROPERTY_PREFIX;
375    }
376
377    @Override
378    protected BinaryGarbageCollector instantiateGarbageCollector() {
379        return new S3BinaryGarbageCollector(this);
380    }
381
382    @Override
383    public void removeBinaries(Collection<String> digests) {
384        digests.forEach(this::removeBinary);
385    }
386
387    protected static boolean isMissingKey(AmazonClientException e) {
388        if (e instanceof AmazonServiceException) {
389            AmazonServiceException ase = (AmazonServiceException) e;
390            return (ase.getStatusCode() == 404) || "NoSuchKey".equals(ase.getErrorCode())
391                    || "Not Found".equals(e.getMessage());
392        }
393        return false;
394    }
395
396    public static boolean isMD5(String digest) {
397        return MD5_RE.matcher(digest).matches();
398    }
399
400    @Override
401    protected FileStorage getFileStorage() {
402        return new S3FileStorage();
403    }
404
405    public class S3FileStorage implements FileStorage {
406
407        @Override
408        public void storeFile(String digest, File file) throws IOException {
409            long t0 = 0;
410            if (log.isDebugEnabled()) {
411                t0 = System.currentTimeMillis();
412                log.debug("storing blob " + digest + " to S3");
413            }
414            String key = bucketNamePrefix + digest;
415            try {
416                amazonS3.getObjectMetadata(bucketName, key);
417                if (log.isDebugEnabled()) {
418                    log.debug("blob " + digest + " is already in S3");
419                }
420            } catch (AmazonClientException e) {
421                if (!isMissingKey(e)) {
422                    throw new IOException(e);
423                }
424                // not already present -> store the blob
425                PutObjectRequest request;
426                if (!isEncrypted) {
427                    request = new PutObjectRequest(bucketName, key, file);
428                    if (userServerSideEncryption) {
429                        ObjectMetadata objectMetadata = new ObjectMetadata();
430                        objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
431                        request.setMetadata(objectMetadata);
432                    }
433                } else {
434                    request = new EncryptedPutObjectRequest(bucketName, key, file);
435                }
436                Upload upload = transferManager.upload(request);
437                try {
438                    upload.waitForUploadResult();
439                } catch (AmazonClientException ee) {
440                    throw new IOException(ee);
441                } catch (InterruptedException ee) {
442                    // reset interrupted status
443                    Thread.currentThread().interrupt();
444                    // continue interrupt
445                    throw new RuntimeException(ee);
446                } finally {
447                    if (log.isDebugEnabled()) {
448                        long dtms = System.currentTimeMillis() - t0;
449                        log.debug("stored blob " + digest + " to S3 in " + dtms + "ms");
450                    }
451                }
452            }
453        }
454
455        @Override
456        public boolean fetchFile(String digest, File file) throws IOException {
457            long t0 = 0;
458            if (log.isDebugEnabled()) {
459                t0 = System.currentTimeMillis();
460                log.debug("fetching blob " + digest + " from S3");
461            }
462            try {
463                Download download = transferManager.download(new GetObjectRequest(bucketName, bucketNamePrefix + digest), file);
464                download.waitForCompletion();
465                // Check ETag it is by default MD5 if not multipart
466                if (!isEncrypted && !digest.equals(download.getObjectMetadata().getETag())) {
467                    // In case of multipart it will happen, verify the downloaded file
468                    String currentDigest;
469                    try (FileInputStream input = new FileInputStream(file)) {
470                        currentDigest = DigestUtils.md5Hex(input);
471                    }
472                    if (!currentDigest.equals(digest)) {
473                        log.error("Invalid ETag in S3, currentDigest=" + currentDigest + " expectedDigest=" + digest);
474                        throw new IOException("Invalid S3 object, it is corrupted expected digest is " + digest + " got " + currentDigest);
475                    }
476                }
477                return true;
478            } catch (AmazonClientException e) {
479                if (!isMissingKey(e)) {
480                    throw new IOException(e);
481                }
482                return false;
483            } catch (InterruptedException e) {
484                // reset interrupted status
485                Thread.currentThread().interrupt();
486                // continue interrupt
487                throw new RuntimeException(e);
488            } finally {
489                if (log.isDebugEnabled()) {
490                    long dtms = System.currentTimeMillis() - t0;
491                    log.debug("fetched blob " + digest + " from S3 in " + dtms + "ms");
492                }
493            }
494
495        }
496    }
497
498    /**
499     * Garbage collector for S3 binaries that stores the marked (in use) binaries in memory.
500     */
501    public static class S3BinaryGarbageCollector extends AbstractBinaryGarbageCollector<S3BinaryManager> {
502
503        protected S3BinaryGarbageCollector(S3BinaryManager binaryManager) {
504            super(binaryManager);
505        }
506
507        @Override
508        public String getId() {
509            return "s3:" + binaryManager.bucketName;
510        }
511
512        @Override
513        public Set<String> getUnmarkedBlobs() {
514            // list S3 objects in the bucket
515            // record those not marked
516            Set<String> unmarked = new HashSet<>();
517            ObjectListing list = null;
518            do {
519                if (list == null) {
520                    list = binaryManager.amazonS3.listObjects(binaryManager.bucketName, binaryManager.bucketNamePrefix);
521                } else {
522                    list = binaryManager.amazonS3.listNextBatchOfObjects(list);
523                }
524                int prefixLength = binaryManager.bucketNamePrefix.length();
525                for (S3ObjectSummary summary : list.getObjectSummaries()) {
526                    String digest = summary.getKey().substring(prefixLength);
527                    if (!isMD5(digest)) {
528                        // ignore files that cannot be MD5 digests for
529                        // safety
530                        continue;
531                    }
532                    long length = summary.getSize();
533                    if (marked.contains(digest)) {
534                        status.numBinaries++;
535                        status.sizeBinaries += length;
536                    } else {
537                        status.numBinariesGC++;
538                        status.sizeBinariesGC += length;
539                        // record file to delete
540                        unmarked.add(digest);
541                        marked.remove(digest); // optimize memory
542                    }
543                }
544            } while (list.isTruncated());
545
546            return unmarked;
547        }
548    }
549
550    // ******************** BlobProvider ********************
551
552    @Override
553    protected URI getRemoteUri(String digest, ManagedBlob blob, HttpServletRequest servletRequest) throws IOException {
554        String key = bucketNamePrefix + digest;
555        Date expiration = new Date();
556        expiration.setTime(expiration.getTime() + directDownloadExpire * 1000);
557        GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucketName, key, HttpMethod.GET);
558        request.addRequestParameter("response-content-type", getContentTypeHeader(blob));
559        request.addRequestParameter("response-content-disposition", getContentDispositionHeader(blob, null));
560        request.setExpiration(expiration);
561        URL url = amazonS3.generatePresignedUrl(request);
562        try {
563            return url.toURI();
564        } catch (URISyntaxException e) {
565            throw new IOException(e);
566        }
567    }
568
569}