001/* 002 * (C) Copyright 2011-2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Mathieu Guillaume 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.blob.jclouds; 021 022import static org.apache.commons.lang.StringUtils.isBlank; 023import static org.apache.commons.lang.StringUtils.isNotBlank; 024 025import java.io.File; 026import java.io.FileOutputStream; 027import java.io.IOException; 028import java.io.InputStream; 029import java.io.OutputStream; 030import java.net.Authenticator; 031import java.net.PasswordAuthentication; 032import java.util.HashSet; 033import java.util.Map; 034import java.util.Set; 035import java.util.regex.Pattern; 036 037import org.apache.commons.io.IOUtils; 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040import org.jclouds.ContextBuilder; 041import org.jclouds.blobstore.BlobStore; 042import org.jclouds.blobstore.BlobStoreContext; 043import org.jclouds.blobstore.domain.Blob; 044import org.jclouds.blobstore.domain.PageSet; 045import org.jclouds.blobstore.domain.StorageMetadata; 046import org.jclouds.blobstore.options.ListContainerOptions; 047import org.jclouds.domain.Location; 048import org.jclouds.domain.LocationBuilder; 049import org.jclouds.domain.LocationScope; 050import org.nuxeo.common.Environment; 051import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 052import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus; 053import org.nuxeo.ecm.core.blob.binary.CachingBinaryManager; 054import org.nuxeo.ecm.core.blob.binary.FileStorage; 055import org.nuxeo.runtime.api.Framework; 056 057import com.google.common.hash.Hashing; 058import com.google.common.io.ByteSource; 059import com.google.common.io.Files; 060 061/** 062 * A Binary Manager that stores binaries in cloud blob stores using jclouds. 063 * <p> 064 * The BLOBs are cached locally on first access for efficiency. 065 * <p> 066 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file 067 * if accessed before the stream. 068 */ 069public class JCloudsBinaryManager extends CachingBinaryManager { 070 071 private static final Log log = LogFactory.getLog(JCloudsBinaryManager.class); 072 073 public static final String BLOBSTORE_PROVIDER_KEY = "jclouds.blobstore.provider"; 074 075 public static final String BLOBSTORE_MAP_NAME_KEY = "jclouds.blobstore.name"; 076 077 public static final String BLOBSTORE_LOCATION_KEY = "jclouds.blobstore.location"; 078 079 public static final String DEFAULT_LOCATION = null; 080 081 public static final String BLOBSTORE_IDENTITY_KEY = "jclouds.blobstore.identity"; 082 083 public static final String BLOBSTORE_SECRET_KEY = "jclouds.blobstore.secret"; 084 085 public static final String CACHE_SIZE_KEY = "jclouds.blobstore.cachesize"; 086 087 public static final String DEFAULT_CACHE_SIZE = "100 MB"; 088 089 private static final Pattern MD5_RE = Pattern.compile("[0-9a-f]{32}"); 090 091 protected String container; 092 093 protected String storeProvider; 094 095 protected BlobStore blobStore; 096 097 @Override 098 public void initialize(String blobProviderId, Map<String, String> properties) throws IOException { 099 super.initialize(blobProviderId, properties); 100 101 // Get settings from the configuration 102 // TODO parse properties too 103 storeProvider = Framework.getProperty(BLOBSTORE_PROVIDER_KEY); 104 if (isBlank(storeProvider)) { 105 throw new RuntimeException("Missing conf: " + BLOBSTORE_PROVIDER_KEY); 106 } 107 container = Framework.getProperty(BLOBSTORE_MAP_NAME_KEY); 108 if (isBlank(container)) { 109 throw new RuntimeException("Missing conf: " + BLOBSTORE_MAP_NAME_KEY); 110 } 111 112 String storeLocation = Framework.getProperty(BLOBSTORE_LOCATION_KEY); 113 if (isBlank(storeLocation)) { 114 storeLocation = null; 115 } 116 117 String storeIdentity = Framework.getProperty(BLOBSTORE_IDENTITY_KEY); 118 if (isBlank(storeIdentity)) { 119 throw new RuntimeException("Missing conf: " + BLOBSTORE_IDENTITY_KEY); 120 } 121 122 String storeSecret = Framework.getProperty(BLOBSTORE_SECRET_KEY); 123 if (isBlank(storeSecret)) { 124 throw new RuntimeException("Missing conf: " + BLOBSTORE_SECRET_KEY); 125 } 126 127 String cacheSizeStr = Framework.getProperty(CACHE_SIZE_KEY); 128 if (isBlank(cacheSizeStr)) { 129 cacheSizeStr = DEFAULT_CACHE_SIZE; 130 } 131 132 String proxyHost = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_HOST); 133 String proxyPort = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PORT); 134 final String proxyLogin = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_LOGIN); 135 final String proxyPassword = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PASSWORD); 136 137 // Set up proxy 138 if (isNotBlank(proxyHost)) { 139 System.setProperty("https.proxyHost", proxyHost); 140 } 141 if (isNotBlank(proxyPort)) { 142 System.setProperty("https.proxyPort", proxyPort); 143 } 144 if (isNotBlank(proxyLogin)) { 145 System.setProperty("https.proxyUser", proxyLogin); 146 System.setProperty("https.proxyPassword", proxyPassword); 147 Authenticator.setDefault(new Authenticator() { 148 @Override 149 public PasswordAuthentication getPasswordAuthentication() { 150 return new PasswordAuthentication(proxyLogin, proxyPassword.toCharArray()); 151 } 152 }); 153 } 154 155 BlobStoreContext context = ContextBuilder.newBuilder(storeProvider).credentials(storeIdentity, storeSecret).buildView( 156 BlobStoreContext.class); 157 158 // Try to create container if it doesn't exist 159 blobStore = context.getBlobStore(); 160 boolean created = false; 161 if (storeLocation == null) { 162 created = blobStore.createContainerInLocation(null, container); 163 } else { 164 Location location = new LocationBuilder().scope(LocationScope.REGION).id(storeLocation).description( 165 storeLocation).build(); 166 created = blobStore.createContainerInLocation(location, container); 167 } 168 if (created) { 169 log.debug("Created container " + container); 170 } 171 172 // Create file cache 173 initializeCache(cacheSizeStr, new JCloudsFileStorage()); 174 createGarbageCollector(); 175 } 176 177 protected void createGarbageCollector() { 178 garbageCollector = new JCloudsBinaryGarbageCollector(this); 179 } 180 181 protected void removeBinary(String digest) { 182 blobStore.removeBlob(container, digest); 183 } 184 185 public static boolean isMD5(String digest) { 186 return MD5_RE.matcher(digest).matches(); 187 } 188 189 public class JCloudsFileStorage implements FileStorage { 190 191 @Override 192 public void storeFile(String digest, File file) throws IOException { 193 Blob currentObject; 194 try { 195 currentObject = blobStore.getBlob(container, digest); 196 } catch (Exception e) { 197 throw new IOException("Unable to check existence of binary", e); 198 } 199 if (currentObject == null) { 200 // no data, store the blob 201 ByteSource byteSource = Files.asByteSource(file); 202 Blob remoteBlob = blobStore.blobBuilder(digest).payload(byteSource).contentLength(byteSource.size()).contentMD5( 203 byteSource.hash(Hashing.md5())).build(); 204 try { 205 blobStore.putBlob(container, remoteBlob); 206 } catch (Exception e) { 207 throw new IOException("Unable to store binary", e); 208 } 209 // validate storage 210 // TODO only check presence and size/md5 211 Blob checkBlob; 212 try { 213 checkBlob = blobStore.getBlob(container, digest); 214 } catch (Exception e) { 215 try { 216 // Remote blob can't be validated - remove it 217 blobStore.removeBlob(container, digest); 218 } catch (Exception e2) { 219 log.error("Possible data corruption : binary " + digest 220 + " validation failed but it could not be removed."); 221 } 222 throw new IOException("Unable to validate stored binary", e); 223 } 224 if (checkBlob == null 225 || remoteBlob.getMetadata().getContentMetadata().getContentLength() != checkBlob.getMetadata().getContentMetadata().getContentLength()) { 226 if (checkBlob != null) { 227 // Remote blob is incomplete - remove it 228 try { 229 blobStore.removeBlob(container, digest); 230 } catch (Exception e2) { 231 log.error("Possible data corruption : binary " + digest 232 + " validation failed but it could not be removed."); 233 } 234 } 235 throw new IOException("Upload to blob store failed"); 236 } 237 } 238 } 239 240 @Override 241 public boolean fetchFile(String digest, File tmp) { 242 Blob remoteBlob; 243 try { 244 remoteBlob = blobStore.getBlob(container, digest); 245 } catch (Exception e) { 246 log.error("Could not cache binary from remote storage: " + digest, e); 247 return false; 248 } 249 if (remoteBlob == null) { 250 log.error("Unknown binary: " + digest); 251 return false; 252 } else { 253 InputStream remoteStream = remoteBlob.getPayload().getInput(); 254 OutputStream localStream = null; 255 try { 256 localStream = new FileOutputStream(tmp); 257 IOUtils.copy(remoteStream, localStream); 258 } catch (IOException e) { 259 log.error("Unable to cache binary from remote storage: " + digest, e); 260 return false; 261 } finally { 262 IOUtils.closeQuietly(remoteStream); 263 IOUtils.closeQuietly(localStream); 264 } 265 } 266 return true; 267 } 268 269 @Override 270 public Long fetchLength(String digest) { 271 Blob remoteBlob; 272 try { 273 remoteBlob = blobStore.getBlob(container, digest); 274 } catch (Exception e) { 275 log.error("Unable to fetch binary information from remote storage"); 276 return null; 277 } 278 if (remoteBlob == null) { 279 return null; 280 } else { 281 return remoteBlob.getMetadata().getContentMetadata().getContentLength(); 282 } 283 } 284 285 } 286 287 /** 288 * Garbage collector for the blobstore binaries that stores the marked (in use) binaries in memory. 289 */ 290 public static class JCloudsBinaryGarbageCollector implements BinaryGarbageCollector { 291 292 protected final JCloudsBinaryManager binaryManager; 293 294 protected volatile long startTime; 295 296 protected BinaryManagerStatus status; 297 298 protected Set<String> marked; 299 300 public JCloudsBinaryGarbageCollector(JCloudsBinaryManager binaryManager) { 301 this.binaryManager = binaryManager; 302 } 303 304 @Override 305 public String getId() { 306 return "jclouds/" + binaryManager.storeProvider + ":" + binaryManager.container; 307 } 308 309 @Override 310 public BinaryManagerStatus getStatus() { 311 return status; 312 } 313 314 @Override 315 public boolean isInProgress() { 316 // volatile as this is designed to be called from another thread 317 return startTime != 0; 318 } 319 320 @Override 321 public void start() { 322 if (startTime != 0) { 323 throw new RuntimeException("Alread started"); 324 } 325 startTime = System.currentTimeMillis(); 326 status = new BinaryManagerStatus(); 327 marked = new HashSet<>(); 328 } 329 330 @Override 331 public void mark(String digest) { 332 marked.add(digest); 333 } 334 335 @Override 336 public void stop(boolean delete) { 337 if (startTime == 0) { 338 throw new RuntimeException("Not started"); 339 } 340 341 Set<String> unmarked = new HashSet<>(); 342 ListContainerOptions options = ListContainerOptions.NONE; 343 for (;;) { 344 PageSet<? extends StorageMetadata> metadatas = binaryManager.blobStore.list(binaryManager.container, options); 345 for (StorageMetadata metadata : metadatas) { 346 String digest = metadata.getName(); 347 if (!isMD5(digest)) { 348 // ignore files that cannot be MD5 digests for safety 349 continue; 350 } 351 // TODO size in metadata available only in upcoming JClouds 1.9.0 (JCLOUDS-654) 352 if (marked.contains(digest)) { 353 status.numBinaries++; 354 // status.sizeBinaries += size; 355 } else { 356 status.numBinariesGC++; 357 // status.sizeBinariesGC += size; 358 // record file to delete 359 unmarked.add(digest); 360 marked.remove(digest); // optimize memory 361 } 362 } 363 String marker = metadatas.getNextMarker(); 364 if (marker == null) { 365 break; 366 } 367 options = ListContainerOptions.Builder.afterMarker(marker); 368 } 369 marked = null; // help GC 370 371 // delete unmarked objects 372 if (delete) { 373 for (String digest : unmarked) { 374 binaryManager.removeBinary(digest); 375 } 376 } 377 378 status.gcDuration = System.currentTimeMillis() - startTime; 379 startTime = 0; 380 } 381 } 382 383}