001/* 002 * (C) Copyright 2011-2014 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Mathieu Guillaume 016 * Florent Guillaume 017 */ 018package org.nuxeo.ecm.blob.jclouds; 019 020import static org.apache.commons.lang.StringUtils.isBlank; 021import static org.apache.commons.lang.StringUtils.isNotBlank; 022 023import java.io.File; 024import java.io.FileOutputStream; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.OutputStream; 028import java.net.Authenticator; 029import java.net.PasswordAuthentication; 030import java.util.HashSet; 031import java.util.Map; 032import java.util.Set; 033import java.util.regex.Pattern; 034 035import org.apache.commons.io.IOUtils; 036import org.apache.commons.logging.Log; 037import org.apache.commons.logging.LogFactory; 038import org.jclouds.ContextBuilder; 039import org.jclouds.blobstore.BlobStore; 040import org.jclouds.blobstore.BlobStoreContext; 041import org.jclouds.blobstore.domain.Blob; 042import org.jclouds.blobstore.domain.PageSet; 043import org.jclouds.blobstore.domain.StorageMetadata; 044import org.jclouds.blobstore.options.ListContainerOptions; 045import org.jclouds.domain.Location; 046import org.jclouds.domain.LocationBuilder; 047import org.jclouds.domain.LocationScope; 048import org.nuxeo.common.Environment; 049import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 050import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus; 051import org.nuxeo.ecm.core.blob.binary.CachingBinaryManager; 052import org.nuxeo.ecm.core.blob.binary.FileStorage; 053import org.nuxeo.runtime.api.Framework; 054 055import com.google.common.hash.Hashing; 056import com.google.common.io.ByteSource; 057import com.google.common.io.Files; 058 059/** 060 * A Binary Manager that stores binaries in cloud blob stores using jclouds. 061 * <p> 062 * The BLOBs are cached locally on first access for efficiency. 063 * <p> 064 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file 065 * if accessed before the stream. 066 */ 067public class JCloudsBinaryManager extends CachingBinaryManager { 068 069 private static final Log log = LogFactory.getLog(JCloudsBinaryManager.class); 070 071 public static final String BLOBSTORE_PROVIDER_KEY = "jclouds.blobstore.provider"; 072 073 public static final String BLOBSTORE_MAP_NAME_KEY = "jclouds.blobstore.name"; 074 075 public static final String BLOBSTORE_LOCATION_KEY = "jclouds.blobstore.location"; 076 077 public static final String DEFAULT_LOCATION = null; 078 079 public static final String BLOBSTORE_IDENTITY_KEY = "jclouds.blobstore.identity"; 080 081 public static final String BLOBSTORE_SECRET_KEY = "jclouds.blobstore.secret"; 082 083 public static final String CACHE_SIZE_KEY = "jclouds.blobstore.cachesize"; 084 085 public static final String DEFAULT_CACHE_SIZE = "100 MB"; 086 087 private static final Pattern MD5_RE = Pattern.compile("[0-9a-f]{32}"); 088 089 protected String container; 090 091 protected String storeProvider; 092 093 protected BlobStore blobStore; 094 095 @Override 096 public void initialize(String blobProviderId, Map<String, String> properties) throws IOException { 097 super.initialize(blobProviderId, properties); 098 099 // Get settings from the configuration 100 // TODO parse properties too 101 storeProvider = Framework.getProperty(BLOBSTORE_PROVIDER_KEY); 102 if (isBlank(storeProvider)) { 103 throw new RuntimeException("Missing conf: " + BLOBSTORE_PROVIDER_KEY); 104 } 105 container = Framework.getProperty(BLOBSTORE_MAP_NAME_KEY); 106 if (isBlank(container)) { 107 throw new RuntimeException("Missing conf: " + BLOBSTORE_MAP_NAME_KEY); 108 } 109 110 String storeLocation = Framework.getProperty(BLOBSTORE_LOCATION_KEY); 111 if (isBlank(storeLocation)) { 112 storeLocation = null; 113 } 114 115 String storeIdentity = Framework.getProperty(BLOBSTORE_IDENTITY_KEY); 116 if (isBlank(storeIdentity)) { 117 throw new RuntimeException("Missing conf: " + BLOBSTORE_IDENTITY_KEY); 118 } 119 120 String storeSecret = Framework.getProperty(BLOBSTORE_SECRET_KEY); 121 if (isBlank(storeSecret)) { 122 throw new RuntimeException("Missing conf: " + BLOBSTORE_SECRET_KEY); 123 } 124 125 String cacheSizeStr = Framework.getProperty(CACHE_SIZE_KEY); 126 if (isBlank(cacheSizeStr)) { 127 cacheSizeStr = DEFAULT_CACHE_SIZE; 128 } 129 130 String proxyHost = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_HOST); 131 String proxyPort = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PORT); 132 final String proxyLogin = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_LOGIN); 133 final String proxyPassword = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PASSWORD); 134 135 // Set up proxy 136 if (isNotBlank(proxyHost)) { 137 System.setProperty("https.proxyHost", proxyHost); 138 } 139 if (isNotBlank(proxyPort)) { 140 System.setProperty("https.proxyPort", proxyPort); 141 } 142 if (isNotBlank(proxyLogin)) { 143 System.setProperty("https.proxyUser", proxyLogin); 144 System.setProperty("https.proxyPassword", proxyPassword); 145 Authenticator.setDefault(new Authenticator() { 146 @Override 147 public PasswordAuthentication getPasswordAuthentication() { 148 return new PasswordAuthentication(proxyLogin, proxyPassword.toCharArray()); 149 } 150 }); 151 } 152 153 BlobStoreContext context = ContextBuilder.newBuilder(storeProvider).credentials(storeIdentity, storeSecret).buildView( 154 BlobStoreContext.class); 155 156 // Try to create container if it doesn't exist 157 blobStore = context.getBlobStore(); 158 boolean created = false; 159 if (storeLocation == null) { 160 created = blobStore.createContainerInLocation(null, container); 161 } else { 162 Location location = new LocationBuilder().scope(LocationScope.REGION).id(storeLocation).description( 163 storeLocation).build(); 164 created = blobStore.createContainerInLocation(location, container); 165 } 166 if (created) { 167 log.debug("Created container " + container); 168 } 169 170 // Create file cache 171 initializeCache(cacheSizeStr, new JCloudsFileStorage()); 172 createGarbageCollector(); 173 } 174 175 protected void createGarbageCollector() { 176 garbageCollector = new JCloudsBinaryGarbageCollector(this); 177 } 178 179 protected void removeBinary(String digest) { 180 blobStore.removeBlob(container, digest); 181 } 182 183 public static boolean isMD5(String digest) { 184 return MD5_RE.matcher(digest).matches(); 185 } 186 187 public class JCloudsFileStorage implements FileStorage { 188 189 @Override 190 public void storeFile(String digest, File file) throws IOException { 191 Blob currentObject; 192 try { 193 currentObject = blobStore.getBlob(container, digest); 194 } catch (Exception e) { 195 throw new IOException("Unable to check existence of binary", e); 196 } 197 if (currentObject == null) { 198 // no data, store the blob 199 ByteSource byteSource = Files.asByteSource(file); 200 Blob remoteBlob = blobStore.blobBuilder(digest).payload(byteSource).contentLength(byteSource.size()).contentMD5( 201 byteSource.hash(Hashing.md5())).build(); 202 try { 203 blobStore.putBlob(container, remoteBlob); 204 } catch (Exception e) { 205 throw new IOException("Unable to store binary", e); 206 } 207 // validate storage 208 // TODO only check presence and size/md5 209 Blob checkBlob; 210 try { 211 checkBlob = blobStore.getBlob(container, digest); 212 } catch (Exception e) { 213 try { 214 // Remote blob can't be validated - remove it 215 blobStore.removeBlob(container, digest); 216 } catch (Exception e2) { 217 log.error("Possible data corruption : binary " + digest 218 + " validation failed but it could not be removed."); 219 } 220 throw new IOException("Unable to validate stored binary", e); 221 } 222 if (checkBlob == null 223 || remoteBlob.getMetadata().getContentMetadata().getContentLength() != checkBlob.getMetadata().getContentMetadata().getContentLength()) { 224 if (checkBlob != null) { 225 // Remote blob is incomplete - remove it 226 try { 227 blobStore.removeBlob(container, digest); 228 } catch (Exception e2) { 229 log.error("Possible data corruption : binary " + digest 230 + " validation failed but it could not be removed."); 231 } 232 } 233 throw new IOException("Upload to blob store failed"); 234 } 235 } 236 } 237 238 @Override 239 public boolean fetchFile(String digest, File tmp) { 240 Blob remoteBlob; 241 try { 242 remoteBlob = blobStore.getBlob(container, digest); 243 } catch (Exception e) { 244 log.error("Could not cache binary from remote storage: " + digest, e); 245 return false; 246 } 247 if (remoteBlob == null) { 248 log.error("Unknown binary: " + digest); 249 return false; 250 } else { 251 InputStream remoteStream = remoteBlob.getPayload().getInput(); 252 OutputStream localStream = null; 253 try { 254 localStream = new FileOutputStream(tmp); 255 IOUtils.copy(remoteStream, localStream); 256 } catch (IOException e) { 257 log.error("Unable to cache binary from remote storage: " + digest, e); 258 return false; 259 } finally { 260 IOUtils.closeQuietly(remoteStream); 261 IOUtils.closeQuietly(localStream); 262 } 263 } 264 return true; 265 } 266 267 @Override 268 public Long fetchLength(String digest) { 269 Blob remoteBlob; 270 try { 271 remoteBlob = blobStore.getBlob(container, digest); 272 } catch (Exception e) { 273 log.error("Unable to fetch binary information from remote storage"); 274 return null; 275 } 276 if (remoteBlob == null) { 277 return null; 278 } else { 279 return remoteBlob.getMetadata().getContentMetadata().getContentLength(); 280 } 281 } 282 283 } 284 285 /** 286 * Garbage collector for the blobstore binaries that stores the marked (in use) binaries in memory. 287 */ 288 public static class JCloudsBinaryGarbageCollector implements BinaryGarbageCollector { 289 290 protected final JCloudsBinaryManager binaryManager; 291 292 protected volatile long startTime; 293 294 protected BinaryManagerStatus status; 295 296 protected Set<String> marked; 297 298 public JCloudsBinaryGarbageCollector(JCloudsBinaryManager binaryManager) { 299 this.binaryManager = binaryManager; 300 } 301 302 @Override 303 public String getId() { 304 return "jclouds/" + binaryManager.storeProvider + ":" + binaryManager.container; 305 } 306 307 @Override 308 public BinaryManagerStatus getStatus() { 309 return status; 310 } 311 312 @Override 313 public boolean isInProgress() { 314 // volatile as this is designed to be called from another thread 315 return startTime != 0; 316 } 317 318 @Override 319 public void start() { 320 if (startTime != 0) { 321 throw new RuntimeException("Alread started"); 322 } 323 startTime = System.currentTimeMillis(); 324 status = new BinaryManagerStatus(); 325 marked = new HashSet<>(); 326 } 327 328 @Override 329 public void mark(String digest) { 330 marked.add(digest); 331 } 332 333 @Override 334 public void stop(boolean delete) { 335 if (startTime == 0) { 336 throw new RuntimeException("Not started"); 337 } 338 339 Set<String> unmarked = new HashSet<>(); 340 ListContainerOptions options = ListContainerOptions.NONE; 341 for (;;) { 342 PageSet<? extends StorageMetadata> metadatas = binaryManager.blobStore.list(binaryManager.container, options); 343 for (StorageMetadata metadata : metadatas) { 344 String digest = metadata.getName(); 345 if (!isMD5(digest)) { 346 // ignore files that cannot be MD5 digests for safety 347 continue; 348 } 349 // TODO size in metadata available only in upcoming JClouds 1.9.0 (JCLOUDS-654) 350 if (marked.contains(digest)) { 351 status.numBinaries++; 352 // status.sizeBinaries += size; 353 } else { 354 status.numBinariesGC++; 355 // status.sizeBinariesGC += size; 356 // record file to delete 357 unmarked.add(digest); 358 marked.remove(digest); // optimize memory 359 } 360 } 361 String marker = metadatas.getNextMarker(); 362 if (marker == null) { 363 break; 364 } 365 options = ListContainerOptions.Builder.afterMarker(marker); 366 } 367 marked = null; // help GC 368 369 // delete unmarked objects 370 if (delete) { 371 for (String digest : unmarked) { 372 binaryManager.removeBinary(digest); 373 } 374 } 375 376 status.gcDuration = System.currentTimeMillis() - startTime; 377 startTime = 0; 378 } 379 } 380 381}