001/*
002 * (C) Copyright 2011-2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Mathieu Guillaume
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.blob.jclouds;
021
022import static org.apache.commons.lang.StringUtils.isBlank;
023import static org.apache.commons.lang.StringUtils.isNotBlank;
024
025import java.io.File;
026import java.io.FileOutputStream;
027import java.io.IOException;
028import java.io.InputStream;
029import java.io.OutputStream;
030import java.net.Authenticator;
031import java.net.PasswordAuthentication;
032import java.util.HashSet;
033import java.util.Map;
034import java.util.Set;
035import java.util.regex.Pattern;
036
037import org.apache.commons.io.IOUtils;
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040import org.jclouds.ContextBuilder;
041import org.jclouds.blobstore.BlobStore;
042import org.jclouds.blobstore.BlobStoreContext;
043import org.jclouds.blobstore.domain.Blob;
044import org.jclouds.blobstore.domain.PageSet;
045import org.jclouds.blobstore.domain.StorageMetadata;
046import org.jclouds.blobstore.options.ListContainerOptions;
047import org.jclouds.domain.Location;
048import org.jclouds.domain.LocationBuilder;
049import org.jclouds.domain.LocationScope;
050import org.nuxeo.common.Environment;
051import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
052import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus;
053import org.nuxeo.ecm.core.blob.binary.CachingBinaryManager;
054import org.nuxeo.ecm.core.blob.binary.FileStorage;
055import org.nuxeo.runtime.api.Framework;
056
057import com.google.common.hash.Hashing;
058import com.google.common.io.ByteSource;
059import com.google.common.io.Files;
060
061/**
062 * A Binary Manager that stores binaries in cloud blob stores using jclouds.
063 * <p>
064 * The BLOBs are cached locally on first access for efficiency.
065 * <p>
066 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file
067 * if accessed before the stream.
068 */
069public class JCloudsBinaryManager extends CachingBinaryManager {
070
071    private static final Log log = LogFactory.getLog(JCloudsBinaryManager.class);
072
073    public static final String BLOBSTORE_PROVIDER_KEY = "jclouds.blobstore.provider";
074
075    public static final String BLOBSTORE_MAP_NAME_KEY = "jclouds.blobstore.name";
076
077    public static final String BLOBSTORE_LOCATION_KEY = "jclouds.blobstore.location";
078
079    public static final String BLOBSTORE_ENDPOINT_KEY = "jclouds.blobstore.endpoint";
080
081    public static final String DEFAULT_LOCATION = null;
082
083    public static final String BLOBSTORE_IDENTITY_KEY = "jclouds.blobstore.identity";
084
085    public static final String BLOBSTORE_SECRET_KEY = "jclouds.blobstore.secret";
086
087    public static final String CACHE_SIZE_KEY = "jclouds.blobstore.cachesize";
088
089    public static final String DEFAULT_CACHE_SIZE = "100 MB";
090
091    private static final Pattern MD5_RE = Pattern.compile("[0-9a-f]{32}");
092
093    protected String container;
094
095    protected String endpoint;
096
097    protected String storeProvider;
098
099    protected BlobStore blobStore;
100
101    @Override
102    public void initialize(String blobProviderId, Map<String, String> properties) throws IOException {
103        super.initialize(blobProviderId, properties);
104
105        // Get settings from the configuration
106        storeProvider = getConfigurationProperty(BLOBSTORE_PROVIDER_KEY, properties);
107        if (isBlank(storeProvider)) {
108            throw new RuntimeException("Missing conf: " + BLOBSTORE_PROVIDER_KEY);
109        }
110
111        container = getConfigurationProperty(BLOBSTORE_MAP_NAME_KEY, properties);
112        if (isBlank(container)) {
113            throw new RuntimeException("Missing conf: " + BLOBSTORE_MAP_NAME_KEY);
114        }
115
116        endpoint = getConfigurationProperty(BLOBSTORE_ENDPOINT_KEY, properties);
117
118        String storeLocation = getConfigurationProperty(BLOBSTORE_LOCATION_KEY, properties);
119        if (isBlank(storeLocation)) {
120            storeLocation = null;
121        }
122
123        String storeIdentity = getConfigurationProperty(BLOBSTORE_IDENTITY_KEY, properties);
124        if (isBlank(storeIdentity)) {
125            throw new RuntimeException("Missing conf: " + BLOBSTORE_IDENTITY_KEY);
126        }
127
128        String storeSecret = getConfigurationProperty(BLOBSTORE_SECRET_KEY, properties);
129        if (isBlank(storeSecret)) {
130            throw new RuntimeException("Missing conf: " + BLOBSTORE_SECRET_KEY);
131        }
132
133        String cacheSizeStr = getConfigurationProperty(CACHE_SIZE_KEY, properties);
134        if (isBlank(cacheSizeStr)) {
135            cacheSizeStr = DEFAULT_CACHE_SIZE;
136        }
137
138        String proxyHost = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_HOST);
139        String proxyPort = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PORT);
140        final String proxyLogin = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_LOGIN);
141        final String proxyPassword = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PASSWORD);
142
143        // Set up proxy
144        if (isNotBlank(proxyHost)) {
145            System.setProperty("https.proxyHost", proxyHost);
146        }
147        if (isNotBlank(proxyPort)) {
148            System.setProperty("https.proxyPort", proxyPort);
149        }
150        if (isNotBlank(proxyLogin)) {
151            System.setProperty("https.proxyUser", proxyLogin);
152            System.setProperty("https.proxyPassword", proxyPassword);
153            Authenticator.setDefault(new Authenticator() {
154                @Override
155                public PasswordAuthentication getPasswordAuthentication() {
156                    return new PasswordAuthentication(proxyLogin, proxyPassword.toCharArray());
157                }
158            });
159        }
160
161        ContextBuilder builder = ContextBuilder.newBuilder(storeProvider).credentials(storeIdentity, storeSecret);
162
163        if (isNotBlank(endpoint)) {
164            builder.endpoint(endpoint);
165        }
166
167        BlobStoreContext context = builder.buildView(BlobStoreContext.class);
168
169        // Try to create container if it doesn't exist
170        blobStore = context.getBlobStore();
171        boolean created = false;
172        if (storeLocation == null) {
173            created = blobStore.createContainerInLocation(null, container);
174        } else {
175            Location location = new LocationBuilder().scope(LocationScope.REGION)
176                                                     .id(storeLocation)
177                                                     .description(storeLocation)
178                                                     .build();
179            created = blobStore.createContainerInLocation(location, container);
180        }
181        if (created) {
182            log.debug("Created container " + container);
183        }
184
185        // Create file cache
186        initializeCache(cacheSizeStr, new JCloudsFileStorage());
187        createGarbageCollector();
188    }
189
190    // Get a property based first on the value in the properties map, then
191    // from the system properties.
192    private String getConfigurationProperty(String key, Map<String, String> properties) {
193        String value = properties.get(key);
194        if (isBlank(value)) {
195            value = Framework.getProperty(key);
196        }
197        return value;
198    }
199
200    protected void createGarbageCollector() {
201        garbageCollector = new JCloudsBinaryGarbageCollector(this);
202    }
203
204    protected void removeBinary(String digest) {
205        blobStore.removeBlob(container, digest);
206    }
207
208    public static boolean isMD5(String digest) {
209        return MD5_RE.matcher(digest).matches();
210    }
211
212    public class JCloudsFileStorage implements FileStorage {
213
214        @Override
215        public void storeFile(String digest, File file) throws IOException {
216            Blob currentObject;
217            try {
218                currentObject = blobStore.getBlob(container, digest);
219            } catch (Exception e) {
220                throw new IOException("Unable to check existence of binary", e);
221            }
222            if (currentObject == null) {
223                // no data, store the blob
224                ByteSource byteSource = Files.asByteSource(file);
225                Blob remoteBlob = blobStore.blobBuilder(digest)
226                                           .payload(byteSource)
227                                           .contentLength(byteSource.size())
228                                           .contentMD5(byteSource.hash(Hashing.md5()))
229                                           .build();
230                try {
231                    blobStore.putBlob(container, remoteBlob);
232                } catch (Exception e) {
233                    throw new IOException("Unable to store binary", e);
234                }
235                // validate storage
236                // TODO only check presence and size/md5
237                Blob checkBlob;
238                try {
239                    checkBlob = blobStore.getBlob(container, digest);
240                } catch (Exception e) {
241                    try {
242                        // Remote blob can't be validated - remove it
243                        blobStore.removeBlob(container, digest);
244                    } catch (Exception e2) {
245                        log.error("Possible data corruption : binary " + digest
246                                + " validation failed but it could not be removed.");
247                    }
248                    throw new IOException("Unable to validate stored binary", e);
249                }
250                if (checkBlob == null || !remoteBlob.getMetadata().getContentMetadata().getContentLength().equals(
251                        checkBlob.getMetadata().getContentMetadata().getContentLength())) {
252                    if (checkBlob != null) {
253                        // Remote blob is incomplete - remove it
254                        try {
255                            blobStore.removeBlob(container, digest);
256                        } catch (Exception e2) {
257                            log.error("Possible data corruption : binary " + digest
258                                    + " validation failed but it could not be removed.");
259                        }
260                    }
261                    throw new IOException("Upload to blob store failed");
262                }
263            }
264        }
265
266        @Override
267        public boolean fetchFile(String digest, File tmp) {
268            Blob remoteBlob;
269            try {
270                remoteBlob = blobStore.getBlob(container, digest);
271            } catch (Exception e) {
272                log.error("Could not cache binary from remote storage: " + digest, e);
273                return false;
274            }
275            if (remoteBlob == null) {
276                log.error("Unknown binary: " + digest);
277                return false;
278            } else {
279                InputStream remoteStream = remoteBlob.getPayload().getInput();
280                OutputStream localStream = null;
281                try {
282                    localStream = new FileOutputStream(tmp);
283                    IOUtils.copy(remoteStream, localStream);
284                } catch (IOException e) {
285                    log.error("Unable to cache binary from remote storage: " + digest, e);
286                    return false;
287                } finally {
288                    IOUtils.closeQuietly(remoteStream);
289                    IOUtils.closeQuietly(localStream);
290                }
291            }
292            return true;
293        }
294    }
295
296    /**
297     * Garbage collector for the blobstore binaries that stores the marked (in use) binaries in memory.
298     */
299    public static class JCloudsBinaryGarbageCollector implements BinaryGarbageCollector {
300
301        protected final JCloudsBinaryManager binaryManager;
302
303        protected volatile long startTime;
304
305        protected BinaryManagerStatus status;
306
307        protected Set<String> marked;
308
309        public JCloudsBinaryGarbageCollector(JCloudsBinaryManager binaryManager) {
310            this.binaryManager = binaryManager;
311        }
312
313        @Override
314        public String getId() {
315            return "jclouds/" + binaryManager.storeProvider + ":" + binaryManager.container;
316        }
317
318        @Override
319        public BinaryManagerStatus getStatus() {
320            return status;
321        }
322
323        @Override
324        public boolean isInProgress() {
325            // volatile as this is designed to be called from another thread
326            return startTime != 0;
327        }
328
329        @Override
330        public void start() {
331            if (startTime != 0) {
332                throw new RuntimeException("Alread started");
333            }
334            startTime = System.currentTimeMillis();
335            status = new BinaryManagerStatus();
336            marked = new HashSet<>();
337        }
338
339        @Override
340        public void mark(String digest) {
341            marked.add(digest);
342        }
343
344        @Override
345        public void stop(boolean delete) {
346            if (startTime == 0) {
347                throw new RuntimeException("Not started");
348            }
349
350            Set<String> unmarked = new HashSet<>();
351            ListContainerOptions options = ListContainerOptions.NONE;
352            for (;;) {
353                PageSet<? extends StorageMetadata> metadatas = binaryManager.blobStore.list(binaryManager.container,
354                        options);
355                for (StorageMetadata metadata : metadatas) {
356                    String digest = metadata.getName();
357                    if (!isMD5(digest)) {
358                        // ignore files that cannot be MD5 digests for safety
359                        continue;
360                    }
361                    // TODO size in metadata available only in upcoming JClouds 1.9.0 (JCLOUDS-654)
362                    if (marked.contains(digest)) {
363                        status.numBinaries++;
364                        // status.sizeBinaries += size;
365                    } else {
366                        status.numBinariesGC++;
367                        // status.sizeBinariesGC += size;
368                        // record file to delete
369                        unmarked.add(digest);
370                        marked.remove(digest); // optimize memory
371                    }
372                }
373                String marker = metadatas.getNextMarker();
374                if (marker == null) {
375                    break;
376                }
377                options = ListContainerOptions.Builder.afterMarker(marker);
378            }
379            marked = null; // help GC
380
381            // delete unmarked objects
382            if (delete) {
383                for (String digest : unmarked) {
384                    binaryManager.removeBinary(digest);
385                }
386            }
387
388            status.gcDuration = System.currentTimeMillis() - startTime;
389            startTime = 0;
390        }
391    }
392
393}