001/*
002 * (C) Copyright 2011-2014 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Mathieu Guillaume
016 *     Florent Guillaume
017 */
018package org.nuxeo.ecm.blob.jclouds;
019
020import static org.apache.commons.lang.StringUtils.isBlank;
021import static org.apache.commons.lang.StringUtils.isNotBlank;
022
023import java.io.File;
024import java.io.FileOutputStream;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.OutputStream;
028import java.net.Authenticator;
029import java.net.PasswordAuthentication;
030import java.util.HashSet;
031import java.util.Map;
032import java.util.Set;
033import java.util.regex.Pattern;
034
035import org.apache.commons.io.IOUtils;
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038import org.jclouds.ContextBuilder;
039import org.jclouds.blobstore.BlobStore;
040import org.jclouds.blobstore.BlobStoreContext;
041import org.jclouds.blobstore.domain.Blob;
042import org.jclouds.blobstore.domain.PageSet;
043import org.jclouds.blobstore.domain.StorageMetadata;
044import org.jclouds.blobstore.options.ListContainerOptions;
045import org.jclouds.domain.Location;
046import org.jclouds.domain.LocationBuilder;
047import org.jclouds.domain.LocationScope;
048import org.nuxeo.common.Environment;
049import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
050import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus;
051import org.nuxeo.ecm.core.blob.binary.CachingBinaryManager;
052import org.nuxeo.ecm.core.blob.binary.FileStorage;
053import org.nuxeo.runtime.api.Framework;
054
055import com.google.common.hash.Hashing;
056import com.google.common.io.ByteSource;
057import com.google.common.io.Files;
058
059/**
060 * A Binary Manager that stores binaries in cloud blob stores using jclouds.
061 * <p>
062 * The BLOBs are cached locally on first access for efficiency.
063 * <p>
064 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file
065 * if accessed before the stream.
066 */
067public class JCloudsBinaryManager extends CachingBinaryManager {
068
069    private static final Log log = LogFactory.getLog(JCloudsBinaryManager.class);
070
071    public static final String BLOBSTORE_PROVIDER_KEY = "jclouds.blobstore.provider";
072
073    public static final String BLOBSTORE_MAP_NAME_KEY = "jclouds.blobstore.name";
074
075    public static final String BLOBSTORE_LOCATION_KEY = "jclouds.blobstore.location";
076
077    public static final String DEFAULT_LOCATION = null;
078
079    public static final String BLOBSTORE_IDENTITY_KEY = "jclouds.blobstore.identity";
080
081    public static final String BLOBSTORE_SECRET_KEY = "jclouds.blobstore.secret";
082
083    public static final String CACHE_SIZE_KEY = "jclouds.blobstore.cachesize";
084
085    public static final String DEFAULT_CACHE_SIZE = "100 MB";
086
087    private static final Pattern MD5_RE = Pattern.compile("[0-9a-f]{32}");
088
089    protected String container;
090
091    protected String storeProvider;
092
093    protected BlobStore blobStore;
094
095    @Override
096    public void initialize(String blobProviderId, Map<String, String> properties) throws IOException {
097        super.initialize(blobProviderId, properties);
098
099        // Get settings from the configuration
100        // TODO parse properties too
101        storeProvider = Framework.getProperty(BLOBSTORE_PROVIDER_KEY);
102        if (isBlank(storeProvider)) {
103            throw new RuntimeException("Missing conf: " + BLOBSTORE_PROVIDER_KEY);
104        }
105        container = Framework.getProperty(BLOBSTORE_MAP_NAME_KEY);
106        if (isBlank(container)) {
107            throw new RuntimeException("Missing conf: " + BLOBSTORE_MAP_NAME_KEY);
108        }
109
110        String storeLocation = Framework.getProperty(BLOBSTORE_LOCATION_KEY);
111        if (isBlank(storeLocation)) {
112            storeLocation = null;
113        }
114
115        String storeIdentity = Framework.getProperty(BLOBSTORE_IDENTITY_KEY);
116        if (isBlank(storeIdentity)) {
117            throw new RuntimeException("Missing conf: " + BLOBSTORE_IDENTITY_KEY);
118        }
119
120        String storeSecret = Framework.getProperty(BLOBSTORE_SECRET_KEY);
121        if (isBlank(storeSecret)) {
122            throw new RuntimeException("Missing conf: " + BLOBSTORE_SECRET_KEY);
123        }
124
125        String cacheSizeStr = Framework.getProperty(CACHE_SIZE_KEY);
126        if (isBlank(cacheSizeStr)) {
127            cacheSizeStr = DEFAULT_CACHE_SIZE;
128        }
129
130        String proxyHost = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_HOST);
131        String proxyPort = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PORT);
132        final String proxyLogin = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_LOGIN);
133        final String proxyPassword = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PASSWORD);
134
135        // Set up proxy
136        if (isNotBlank(proxyHost)) {
137            System.setProperty("https.proxyHost", proxyHost);
138        }
139        if (isNotBlank(proxyPort)) {
140            System.setProperty("https.proxyPort", proxyPort);
141        }
142        if (isNotBlank(proxyLogin)) {
143            System.setProperty("https.proxyUser", proxyLogin);
144            System.setProperty("https.proxyPassword", proxyPassword);
145            Authenticator.setDefault(new Authenticator() {
146                @Override
147                public PasswordAuthentication getPasswordAuthentication() {
148                    return new PasswordAuthentication(proxyLogin, proxyPassword.toCharArray());
149                }
150            });
151        }
152
153        BlobStoreContext context = ContextBuilder.newBuilder(storeProvider).credentials(storeIdentity, storeSecret).buildView(
154                BlobStoreContext.class);
155
156        // Try to create container if it doesn't exist
157        blobStore = context.getBlobStore();
158        boolean created = false;
159        if (storeLocation == null) {
160            created = blobStore.createContainerInLocation(null, container);
161        } else {
162            Location location = new LocationBuilder().scope(LocationScope.REGION).id(storeLocation).description(
163                    storeLocation).build();
164            created = blobStore.createContainerInLocation(location, container);
165        }
166        if (created) {
167            log.debug("Created container " + container);
168        }
169
170        // Create file cache
171        initializeCache(cacheSizeStr, new JCloudsFileStorage());
172        createGarbageCollector();
173    }
174
175    protected void createGarbageCollector() {
176        garbageCollector = new JCloudsBinaryGarbageCollector(this);
177    }
178
179    protected void removeBinary(String digest) {
180        blobStore.removeBlob(container, digest);
181    }
182
183    public static boolean isMD5(String digest) {
184        return MD5_RE.matcher(digest).matches();
185    }
186
187    public class JCloudsFileStorage implements FileStorage {
188
189        @Override
190        public void storeFile(String digest, File file) throws IOException {
191            Blob currentObject;
192            try {
193                currentObject = blobStore.getBlob(container, digest);
194            } catch (Exception e) {
195                throw new IOException("Unable to check existence of binary", e);
196            }
197            if (currentObject == null) {
198                // no data, store the blob
199                ByteSource byteSource = Files.asByteSource(file);
200                Blob remoteBlob = blobStore.blobBuilder(digest).payload(byteSource).contentLength(byteSource.size()).contentMD5(
201                        byteSource.hash(Hashing.md5())).build();
202                try {
203                    blobStore.putBlob(container, remoteBlob);
204                } catch (Exception e) {
205                    throw new IOException("Unable to store binary", e);
206                }
207                // validate storage
208                // TODO only check presence and size/md5
209                Blob checkBlob;
210                try {
211                    checkBlob = blobStore.getBlob(container, digest);
212                } catch (Exception e) {
213                    try {
214                        // Remote blob can't be validated - remove it
215                        blobStore.removeBlob(container, digest);
216                    } catch (Exception e2) {
217                        log.error("Possible data corruption : binary " + digest
218                                + " validation failed but it could not be removed.");
219                    }
220                    throw new IOException("Unable to validate stored binary", e);
221                }
222                if (checkBlob == null
223                        || remoteBlob.getMetadata().getContentMetadata().getContentLength() != checkBlob.getMetadata().getContentMetadata().getContentLength()) {
224                    if (checkBlob != null) {
225                        // Remote blob is incomplete - remove it
226                        try {
227                            blobStore.removeBlob(container, digest);
228                        } catch (Exception e2) {
229                            log.error("Possible data corruption : binary " + digest
230                                    + " validation failed but it could not be removed.");
231                        }
232                    }
233                    throw new IOException("Upload to blob store failed");
234                }
235            }
236        }
237
238        @Override
239        public boolean fetchFile(String digest, File tmp) {
240            Blob remoteBlob;
241            try {
242                remoteBlob = blobStore.getBlob(container, digest);
243            } catch (Exception e) {
244                log.error("Could not cache binary from remote storage: " + digest, e);
245                return false;
246            }
247            if (remoteBlob == null) {
248                log.error("Unknown binary: " + digest);
249                return false;
250            } else {
251                InputStream remoteStream = remoteBlob.getPayload().getInput();
252                OutputStream localStream = null;
253                try {
254                    localStream = new FileOutputStream(tmp);
255                    IOUtils.copy(remoteStream, localStream);
256                } catch (IOException e) {
257                    log.error("Unable to cache binary from remote storage: " + digest, e);
258                    return false;
259                } finally {
260                    IOUtils.closeQuietly(remoteStream);
261                    IOUtils.closeQuietly(localStream);
262                }
263            }
264            return true;
265        }
266
267        @Override
268        public Long fetchLength(String digest) {
269            Blob remoteBlob;
270            try {
271                remoteBlob = blobStore.getBlob(container, digest);
272            } catch (Exception e) {
273                log.error("Unable to fetch binary information from remote storage");
274                return null;
275            }
276            if (remoteBlob == null) {
277                return null;
278            } else {
279                return remoteBlob.getMetadata().getContentMetadata().getContentLength();
280            }
281        }
282
283    }
284
285    /**
286     * Garbage collector for the blobstore binaries that stores the marked (in use) binaries in memory.
287     */
288    public static class JCloudsBinaryGarbageCollector implements BinaryGarbageCollector {
289
290        protected final JCloudsBinaryManager binaryManager;
291
292        protected volatile long startTime;
293
294        protected BinaryManagerStatus status;
295
296        protected Set<String> marked;
297
298        public JCloudsBinaryGarbageCollector(JCloudsBinaryManager binaryManager) {
299            this.binaryManager = binaryManager;
300        }
301
302        @Override
303        public String getId() {
304            return "jclouds/" + binaryManager.storeProvider + ":" + binaryManager.container;
305        }
306
307        @Override
308        public BinaryManagerStatus getStatus() {
309            return status;
310        }
311
312        @Override
313        public boolean isInProgress() {
314            // volatile as this is designed to be called from another thread
315            return startTime != 0;
316        }
317
318        @Override
319        public void start() {
320            if (startTime != 0) {
321                throw new RuntimeException("Alread started");
322            }
323            startTime = System.currentTimeMillis();
324            status = new BinaryManagerStatus();
325            marked = new HashSet<>();
326        }
327
328        @Override
329        public void mark(String digest) {
330            marked.add(digest);
331        }
332
333        @Override
334        public void stop(boolean delete) {
335            if (startTime == 0) {
336                throw new RuntimeException("Not started");
337            }
338
339            Set<String> unmarked = new HashSet<>();
340            ListContainerOptions options = ListContainerOptions.NONE;
341            for (;;) {
342                PageSet<? extends StorageMetadata> metadatas = binaryManager.blobStore.list(binaryManager.container, options);
343                for (StorageMetadata metadata : metadatas) {
344                    String digest = metadata.getName();
345                    if (!isMD5(digest)) {
346                        // ignore files that cannot be MD5 digests for safety
347                        continue;
348                    }
349                    // TODO size in metadata available only in upcoming JClouds 1.9.0 (JCLOUDS-654)
350                    if (marked.contains(digest)) {
351                        status.numBinaries++;
352                        // status.sizeBinaries += size;
353                    } else {
354                        status.numBinariesGC++;
355                        // status.sizeBinariesGC += size;
356                        // record file to delete
357                        unmarked.add(digest);
358                        marked.remove(digest); // optimize memory
359                    }
360                }
361                String marker = metadatas.getNextMarker();
362                if (marker == null) {
363                    break;
364                }
365                options = ListContainerOptions.Builder.afterMarker(marker);
366            }
367            marked = null; // help GC
368
369            // delete unmarked objects
370            if (delete) {
371                for (String digest : unmarked) {
372                    binaryManager.removeBinary(digest);
373                }
374            }
375
376            status.gcDuration = System.currentTimeMillis() - startTime;
377            startTime = 0;
378        }
379    }
380
381}