001/*
002 * (C) Copyright 2011-2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Mathieu Guillaume
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.blob.jclouds;
021
022import static org.apache.commons.lang.StringUtils.isBlank;
023import static org.apache.commons.lang.StringUtils.isNotBlank;
024
025import java.io.File;
026import java.io.FileOutputStream;
027import java.io.IOException;
028import java.io.InputStream;
029import java.io.OutputStream;
030import java.net.Authenticator;
031import java.net.PasswordAuthentication;
032import java.util.HashSet;
033import java.util.Map;
034import java.util.Set;
035import java.util.regex.Pattern;
036
037import org.apache.commons.io.IOUtils;
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040import org.jclouds.ContextBuilder;
041import org.jclouds.blobstore.BlobStore;
042import org.jclouds.blobstore.BlobStoreContext;
043import org.jclouds.blobstore.domain.Blob;
044import org.jclouds.blobstore.domain.PageSet;
045import org.jclouds.blobstore.domain.StorageMetadata;
046import org.jclouds.blobstore.options.ListContainerOptions;
047import org.jclouds.domain.Location;
048import org.jclouds.domain.LocationBuilder;
049import org.jclouds.domain.LocationScope;
050import org.nuxeo.common.Environment;
051import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
052import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus;
053import org.nuxeo.ecm.core.blob.binary.CachingBinaryManager;
054import org.nuxeo.ecm.core.blob.binary.FileStorage;
055import org.nuxeo.runtime.api.Framework;
056
057import com.google.common.hash.Hashing;
058import com.google.common.io.ByteSource;
059import com.google.common.io.Files;
060
061/**
062 * A Binary Manager that stores binaries in cloud blob stores using jclouds.
063 * <p>
064 * The BLOBs are cached locally on first access for efficiency.
065 * <p>
066 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file
067 * if accessed before the stream.
068 */
069public class JCloudsBinaryManager extends CachingBinaryManager {
070
071    private static final Log log = LogFactory.getLog(JCloudsBinaryManager.class);
072
073    public static final String BLOBSTORE_PROVIDER_KEY = "jclouds.blobstore.provider";
074
075    public static final String BLOBSTORE_MAP_NAME_KEY = "jclouds.blobstore.name";
076
077    public static final String BLOBSTORE_LOCATION_KEY = "jclouds.blobstore.location";
078
079    public static final String DEFAULT_LOCATION = null;
080
081    public static final String BLOBSTORE_IDENTITY_KEY = "jclouds.blobstore.identity";
082
083    public static final String BLOBSTORE_SECRET_KEY = "jclouds.blobstore.secret";
084
085    public static final String CACHE_SIZE_KEY = "jclouds.blobstore.cachesize";
086
087    public static final String DEFAULT_CACHE_SIZE = "100 MB";
088
089    private static final Pattern MD5_RE = Pattern.compile("[0-9a-f]{32}");
090
091    protected String container;
092
093    protected String storeProvider;
094
095    protected BlobStore blobStore;
096
097    @Override
098    public void initialize(String blobProviderId, Map<String, String> properties) throws IOException {
099        super.initialize(blobProviderId, properties);
100
101        // Get settings from the configuration
102        // TODO parse properties too
103        storeProvider = Framework.getProperty(BLOBSTORE_PROVIDER_KEY);
104        if (isBlank(storeProvider)) {
105            throw new RuntimeException("Missing conf: " + BLOBSTORE_PROVIDER_KEY);
106        }
107        container = Framework.getProperty(BLOBSTORE_MAP_NAME_KEY);
108        if (isBlank(container)) {
109            throw new RuntimeException("Missing conf: " + BLOBSTORE_MAP_NAME_KEY);
110        }
111
112        String storeLocation = Framework.getProperty(BLOBSTORE_LOCATION_KEY);
113        if (isBlank(storeLocation)) {
114            storeLocation = null;
115        }
116
117        String storeIdentity = Framework.getProperty(BLOBSTORE_IDENTITY_KEY);
118        if (isBlank(storeIdentity)) {
119            throw new RuntimeException("Missing conf: " + BLOBSTORE_IDENTITY_KEY);
120        }
121
122        String storeSecret = Framework.getProperty(BLOBSTORE_SECRET_KEY);
123        if (isBlank(storeSecret)) {
124            throw new RuntimeException("Missing conf: " + BLOBSTORE_SECRET_KEY);
125        }
126
127        String cacheSizeStr = Framework.getProperty(CACHE_SIZE_KEY);
128        if (isBlank(cacheSizeStr)) {
129            cacheSizeStr = DEFAULT_CACHE_SIZE;
130        }
131
132        String proxyHost = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_HOST);
133        String proxyPort = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PORT);
134        final String proxyLogin = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_LOGIN);
135        final String proxyPassword = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PASSWORD);
136
137        // Set up proxy
138        if (isNotBlank(proxyHost)) {
139            System.setProperty("https.proxyHost", proxyHost);
140        }
141        if (isNotBlank(proxyPort)) {
142            System.setProperty("https.proxyPort", proxyPort);
143        }
144        if (isNotBlank(proxyLogin)) {
145            System.setProperty("https.proxyUser", proxyLogin);
146            System.setProperty("https.proxyPassword", proxyPassword);
147            Authenticator.setDefault(new Authenticator() {
148                @Override
149                public PasswordAuthentication getPasswordAuthentication() {
150                    return new PasswordAuthentication(proxyLogin, proxyPassword.toCharArray());
151                }
152            });
153        }
154
155        BlobStoreContext context = ContextBuilder.newBuilder(storeProvider).credentials(storeIdentity, storeSecret).buildView(
156                BlobStoreContext.class);
157
158        // Try to create container if it doesn't exist
159        blobStore = context.getBlobStore();
160        boolean created = false;
161        if (storeLocation == null) {
162            created = blobStore.createContainerInLocation(null, container);
163        } else {
164            Location location = new LocationBuilder().scope(LocationScope.REGION).id(storeLocation).description(
165                    storeLocation).build();
166            created = blobStore.createContainerInLocation(location, container);
167        }
168        if (created) {
169            log.debug("Created container " + container);
170        }
171
172        // Create file cache
173        initializeCache(cacheSizeStr, new JCloudsFileStorage());
174        createGarbageCollector();
175    }
176
177    protected void createGarbageCollector() {
178        garbageCollector = new JCloudsBinaryGarbageCollector(this);
179    }
180
181    protected void removeBinary(String digest) {
182        blobStore.removeBlob(container, digest);
183    }
184
185    public static boolean isMD5(String digest) {
186        return MD5_RE.matcher(digest).matches();
187    }
188
189    public class JCloudsFileStorage implements FileStorage {
190
191        @Override
192        public void storeFile(String digest, File file) throws IOException {
193            Blob currentObject;
194            try {
195                currentObject = blobStore.getBlob(container, digest);
196            } catch (Exception e) {
197                throw new IOException("Unable to check existence of binary", e);
198            }
199            if (currentObject == null) {
200                // no data, store the blob
201                ByteSource byteSource = Files.asByteSource(file);
202                Blob remoteBlob = blobStore.blobBuilder(digest).payload(byteSource).contentLength(byteSource.size()).contentMD5(
203                        byteSource.hash(Hashing.md5())).build();
204                try {
205                    blobStore.putBlob(container, remoteBlob);
206                } catch (Exception e) {
207                    throw new IOException("Unable to store binary", e);
208                }
209                // validate storage
210                // TODO only check presence and size/md5
211                Blob checkBlob;
212                try {
213                    checkBlob = blobStore.getBlob(container, digest);
214                } catch (Exception e) {
215                    try {
216                        // Remote blob can't be validated - remove it
217                        blobStore.removeBlob(container, digest);
218                    } catch (Exception e2) {
219                        log.error("Possible data corruption : binary " + digest
220                                + " validation failed but it could not be removed.");
221                    }
222                    throw new IOException("Unable to validate stored binary", e);
223                }
224                if (checkBlob == null
225                        || remoteBlob.getMetadata().getContentMetadata().getContentLength() != checkBlob.getMetadata().getContentMetadata().getContentLength()) {
226                    if (checkBlob != null) {
227                        // Remote blob is incomplete - remove it
228                        try {
229                            blobStore.removeBlob(container, digest);
230                        } catch (Exception e2) {
231                            log.error("Possible data corruption : binary " + digest
232                                    + " validation failed but it could not be removed.");
233                        }
234                    }
235                    throw new IOException("Upload to blob store failed");
236                }
237            }
238        }
239
240        @Override
241        public boolean fetchFile(String digest, File tmp) {
242            Blob remoteBlob;
243            try {
244                remoteBlob = blobStore.getBlob(container, digest);
245            } catch (Exception e) {
246                log.error("Could not cache binary from remote storage: " + digest, e);
247                return false;
248            }
249            if (remoteBlob == null) {
250                log.error("Unknown binary: " + digest);
251                return false;
252            } else {
253                InputStream remoteStream = remoteBlob.getPayload().getInput();
254                OutputStream localStream = null;
255                try {
256                    localStream = new FileOutputStream(tmp);
257                    IOUtils.copy(remoteStream, localStream);
258                } catch (IOException e) {
259                    log.error("Unable to cache binary from remote storage: " + digest, e);
260                    return false;
261                } finally {
262                    IOUtils.closeQuietly(remoteStream);
263                    IOUtils.closeQuietly(localStream);
264                }
265            }
266            return true;
267        }
268
269        @Override
270        public Long fetchLength(String digest) {
271            Blob remoteBlob;
272            try {
273                remoteBlob = blobStore.getBlob(container, digest);
274            } catch (Exception e) {
275                log.error("Unable to fetch binary information from remote storage");
276                return null;
277            }
278            if (remoteBlob == null) {
279                return null;
280            } else {
281                return remoteBlob.getMetadata().getContentMetadata().getContentLength();
282            }
283        }
284
285    }
286
287    /**
288     * Garbage collector for the blobstore binaries that stores the marked (in use) binaries in memory.
289     */
290    public static class JCloudsBinaryGarbageCollector implements BinaryGarbageCollector {
291
292        protected final JCloudsBinaryManager binaryManager;
293
294        protected volatile long startTime;
295
296        protected BinaryManagerStatus status;
297
298        protected Set<String> marked;
299
300        public JCloudsBinaryGarbageCollector(JCloudsBinaryManager binaryManager) {
301            this.binaryManager = binaryManager;
302        }
303
304        @Override
305        public String getId() {
306            return "jclouds/" + binaryManager.storeProvider + ":" + binaryManager.container;
307        }
308
309        @Override
310        public BinaryManagerStatus getStatus() {
311            return status;
312        }
313
314        @Override
315        public boolean isInProgress() {
316            // volatile as this is designed to be called from another thread
317            return startTime != 0;
318        }
319
320        @Override
321        public void start() {
322            if (startTime != 0) {
323                throw new RuntimeException("Alread started");
324            }
325            startTime = System.currentTimeMillis();
326            status = new BinaryManagerStatus();
327            marked = new HashSet<>();
328        }
329
330        @Override
331        public void mark(String digest) {
332            marked.add(digest);
333        }
334
335        @Override
336        public void stop(boolean delete) {
337            if (startTime == 0) {
338                throw new RuntimeException("Not started");
339            }
340
341            Set<String> unmarked = new HashSet<>();
342            ListContainerOptions options = ListContainerOptions.NONE;
343            for (;;) {
344                PageSet<? extends StorageMetadata> metadatas = binaryManager.blobStore.list(binaryManager.container, options);
345                for (StorageMetadata metadata : metadatas) {
346                    String digest = metadata.getName();
347                    if (!isMD5(digest)) {
348                        // ignore files that cannot be MD5 digests for safety
349                        continue;
350                    }
351                    // TODO size in metadata available only in upcoming JClouds 1.9.0 (JCLOUDS-654)
352                    if (marked.contains(digest)) {
353                        status.numBinaries++;
354                        // status.sizeBinaries += size;
355                    } else {
356                        status.numBinariesGC++;
357                        // status.sizeBinariesGC += size;
358                        // record file to delete
359                        unmarked.add(digest);
360                        marked.remove(digest); // optimize memory
361                    }
362                }
363                String marker = metadatas.getNextMarker();
364                if (marker == null) {
365                    break;
366                }
367                options = ListContainerOptions.Builder.afterMarker(marker);
368            }
369            marked = null; // help GC
370
371            // delete unmarked objects
372            if (delete) {
373                for (String digest : unmarked) {
374                    binaryManager.removeBinary(digest);
375                }
376            }
377
378            status.gcDuration = System.currentTimeMillis() - startTime;
379            startTime = 0;
380        }
381    }
382
383}