001/* 002 * (C) Copyright 2011-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Mathieu Guillaume 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.blob.jclouds; 021 022import static org.apache.commons.lang3.StringUtils.isBlank; 023import static org.apache.commons.lang3.StringUtils.isNotBlank; 024 025import java.io.File; 026import java.io.FileOutputStream; 027import java.io.IOException; 028import java.io.InputStream; 029import java.io.OutputStream; 030import java.net.Authenticator; 031import java.net.PasswordAuthentication; 032import java.util.HashSet; 033import java.util.Map; 034import java.util.Set; 035import java.util.regex.Pattern; 036 037import org.apache.commons.io.IOUtils; 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040import org.jclouds.ContextBuilder; 041import org.jclouds.blobstore.BlobStore; 042import org.jclouds.blobstore.BlobStoreContext; 043import org.jclouds.blobstore.domain.Blob; 044import org.jclouds.blobstore.domain.PageSet; 045import org.jclouds.blobstore.domain.StorageMetadata; 046import org.jclouds.blobstore.options.ListContainerOptions; 047import org.jclouds.domain.Location; 048import org.jclouds.domain.LocationBuilder; 049import org.jclouds.domain.LocationScope; 050import org.nuxeo.common.Environment; 051import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 052import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus; 053import org.nuxeo.ecm.core.blob.binary.CachingBinaryManager; 054import org.nuxeo.ecm.core.blob.binary.FileStorage; 055import org.nuxeo.runtime.api.Framework; 056 057import com.google.common.hash.Hashing; 058import com.google.common.io.ByteSource; 059import com.google.common.io.Files; 060 061/** 062 * A Binary Manager that stores binaries in cloud blob stores using jclouds. 063 * <p> 064 * The BLOBs are cached locally on first access for efficiency. 065 * <p> 066 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file 067 * if accessed before the stream. 068 */ 069public class JCloudsBinaryManager extends CachingBinaryManager { 070 071 private static final Log log = LogFactory.getLog(JCloudsBinaryManager.class); 072 073 public static final String BLOBSTORE_PROVIDER_KEY = "jclouds.blobstore.provider"; 074 075 public static final String BLOBSTORE_MAP_NAME_KEY = "jclouds.blobstore.name"; 076 077 public static final String BLOBSTORE_LOCATION_KEY = "jclouds.blobstore.location"; 078 079 public static final String BLOBSTORE_ENDPOINT_KEY = "jclouds.blobstore.endpoint"; 080 081 public static final String DEFAULT_LOCATION = null; 082 083 public static final String BLOBSTORE_IDENTITY_KEY = "jclouds.blobstore.identity"; 084 085 public static final String BLOBSTORE_SECRET_KEY = "jclouds.blobstore.secret"; 086 087 public static final String CACHE_SIZE_KEY = "jclouds.blobstore.cachesize"; 088 089 public static final String DEFAULT_CACHE_SIZE = "100 MB"; 090 091 private static final Pattern MD5_RE = Pattern.compile("[0-9a-f]{32}"); 092 093 protected String container; 094 095 protected String endpoint; 096 097 protected String storeProvider; 098 099 protected BlobStore blobStore; 100 101 @Override 102 public void initialize(String blobProviderId, Map<String, String> properties) throws IOException { 103 super.initialize(blobProviderId, properties); 104 105 // Get settings from the configuration 106 storeProvider = getConfigurationProperty(BLOBSTORE_PROVIDER_KEY, properties); 107 if (isBlank(storeProvider)) { 108 throw new RuntimeException("Missing conf: " + BLOBSTORE_PROVIDER_KEY); 109 } 110 111 container = getConfigurationProperty(BLOBSTORE_MAP_NAME_KEY, properties); 112 if (isBlank(container)) { 113 throw new RuntimeException("Missing conf: " + BLOBSTORE_MAP_NAME_KEY); 114 } 115 116 endpoint = getConfigurationProperty(BLOBSTORE_ENDPOINT_KEY, properties); 117 118 String storeLocation = getConfigurationProperty(BLOBSTORE_LOCATION_KEY, properties); 119 if (isBlank(storeLocation)) { 120 storeLocation = null; 121 } 122 123 String storeIdentity = getConfigurationProperty(BLOBSTORE_IDENTITY_KEY, properties); 124 if (isBlank(storeIdentity)) { 125 throw new RuntimeException("Missing conf: " + BLOBSTORE_IDENTITY_KEY); 126 } 127 128 String storeSecret = getConfigurationProperty(BLOBSTORE_SECRET_KEY, properties); 129 if (isBlank(storeSecret)) { 130 throw new RuntimeException("Missing conf: " + BLOBSTORE_SECRET_KEY); 131 } 132 133 String cacheSizeStr = getConfigurationProperty(CACHE_SIZE_KEY, properties); 134 if (isBlank(cacheSizeStr)) { 135 cacheSizeStr = DEFAULT_CACHE_SIZE; 136 } 137 138 String proxyHost = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_HOST); 139 String proxyPort = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PORT); 140 final String proxyLogin = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_LOGIN); 141 final String proxyPassword = Framework.getProperty(Environment.NUXEO_HTTP_PROXY_PASSWORD); 142 143 // Set up proxy 144 if (isNotBlank(proxyHost)) { 145 System.setProperty("https.proxyHost", proxyHost); 146 } 147 if (isNotBlank(proxyPort)) { 148 System.setProperty("https.proxyPort", proxyPort); 149 } 150 if (isNotBlank(proxyLogin)) { 151 System.setProperty("https.proxyUser", proxyLogin); 152 System.setProperty("https.proxyPassword", proxyPassword); 153 Authenticator.setDefault(new Authenticator() { 154 @Override 155 public PasswordAuthentication getPasswordAuthentication() { 156 return new PasswordAuthentication(proxyLogin, proxyPassword.toCharArray()); 157 } 158 }); 159 } 160 161 ContextBuilder builder = ContextBuilder.newBuilder(storeProvider).credentials(storeIdentity, storeSecret); 162 163 if (isNotBlank(endpoint)) { 164 builder.endpoint(endpoint); 165 } 166 167 BlobStoreContext context = builder.buildView(BlobStoreContext.class); 168 169 // Try to create container if it doesn't exist 170 blobStore = context.getBlobStore(); 171 boolean created; 172 if (storeLocation == null) { 173 created = blobStore.createContainerInLocation(null, container); 174 } else { 175 Location location = new LocationBuilder().scope(LocationScope.REGION) 176 .id(storeLocation) 177 .description(storeLocation) 178 .build(); 179 created = blobStore.createContainerInLocation(location, container); 180 } 181 if (created) { 182 log.debug("Created container " + container); 183 } 184 185 // Create file cache 186 initializeCache(cacheSizeStr, new JCloudsFileStorage()); 187 createGarbageCollector(); 188 } 189 190 // Get a property based first on the value in the properties map, then 191 // from the system properties. 192 private String getConfigurationProperty(String key, Map<String, String> properties) { 193 String value = properties.get(key); 194 if (isBlank(value)) { 195 value = Framework.getProperty(key); 196 } 197 return value; 198 } 199 200 protected void createGarbageCollector() { 201 garbageCollector = new JCloudsBinaryGarbageCollector(this); 202 } 203 204 protected void removeBinary(String digest) { 205 blobStore.removeBlob(container, digest); 206 } 207 208 public static boolean isMD5(String digest) { 209 return MD5_RE.matcher(digest).matches(); 210 } 211 212 public class JCloudsFileStorage implements FileStorage { 213 214 @Override 215 public void storeFile(String digest, File file) throws IOException { 216 Blob currentObject; 217 try { 218 currentObject = blobStore.getBlob(container, digest); 219 } catch (Exception e) { 220 throw new IOException("Unable to check existence of binary", e); 221 } 222 if (currentObject == null) { 223 // no data, store the blob 224 ByteSource byteSource = Files.asByteSource(file); 225 Blob remoteBlob = blobStore.blobBuilder(digest) 226 .payload(byteSource) 227 .contentLength(byteSource.size()) 228 .contentMD5(byteSource.hash(Hashing.md5())) 229 .build(); 230 try { 231 blobStore.putBlob(container, remoteBlob); 232 } catch (Exception e) { 233 throw new IOException("Unable to store binary", e); 234 } 235 // validate storage 236 // TODO only check presence and size/md5 237 Blob checkBlob; 238 try { 239 checkBlob = blobStore.getBlob(container, digest); 240 } catch (Exception e) { 241 try { 242 // Remote blob can't be validated - remove it 243 blobStore.removeBlob(container, digest); 244 } catch (Exception e2) { 245 log.error("Possible data corruption : binary " + digest 246 + " validation failed but it could not be removed."); 247 } 248 throw new IOException("Unable to validate stored binary", e); 249 } 250 if (checkBlob == null || !remoteBlob.getMetadata().getContentMetadata().getContentLength().equals( 251 checkBlob.getMetadata().getContentMetadata().getContentLength())) { 252 if (checkBlob != null) { 253 // Remote blob is incomplete - remove it 254 try { 255 blobStore.removeBlob(container, digest); 256 } catch (Exception e2) { 257 log.error("Possible data corruption : binary " + digest 258 + " validation failed but it could not be removed."); 259 } 260 } 261 throw new IOException("Upload to blob store failed"); 262 } 263 } 264 } 265 266 @Override 267 public boolean fetchFile(String digest, File tmp) { 268 Blob remoteBlob; 269 try { 270 remoteBlob = blobStore.getBlob(container, digest); 271 } catch (Exception e) { 272 log.error("Could not cache binary from remote storage: " + digest, e); 273 return false; 274 } 275 if (remoteBlob == null) { 276 log.error("Unknown binary: " + digest); 277 return false; 278 } else { 279 InputStream remoteStream = remoteBlob.getPayload().getInput(); 280 OutputStream localStream = null; 281 try { 282 localStream = new FileOutputStream(tmp); 283 IOUtils.copy(remoteStream, localStream); 284 } catch (IOException e) { 285 log.error("Unable to cache binary from remote storage: " + digest, e); 286 return false; 287 } finally { 288 IOUtils.closeQuietly(remoteStream); 289 IOUtils.closeQuietly(localStream); 290 } 291 } 292 return true; 293 } 294 } 295 296 /** 297 * Garbage collector for the blobstore binaries that stores the marked (in use) binaries in memory. 298 */ 299 public static class JCloudsBinaryGarbageCollector implements BinaryGarbageCollector { 300 301 protected final JCloudsBinaryManager binaryManager; 302 303 protected volatile long startTime; 304 305 protected BinaryManagerStatus status; 306 307 protected Set<String> marked; 308 309 public JCloudsBinaryGarbageCollector(JCloudsBinaryManager binaryManager) { 310 this.binaryManager = binaryManager; 311 } 312 313 @Override 314 public String getId() { 315 return "jclouds/" + binaryManager.storeProvider + ":" + binaryManager.container; 316 } 317 318 @Override 319 public BinaryManagerStatus getStatus() { 320 return status; 321 } 322 323 @Override 324 public boolean isInProgress() { 325 // volatile as this is designed to be called from another thread 326 return startTime != 0; 327 } 328 329 @Override 330 public void start() { 331 if (startTime != 0) { 332 throw new RuntimeException("Alread started"); 333 } 334 startTime = System.currentTimeMillis(); 335 status = new BinaryManagerStatus(); 336 marked = new HashSet<>(); 337 } 338 339 @Override 340 public void mark(String digest) { 341 marked.add(digest); 342 } 343 344 @Override 345 public void stop(boolean delete) { 346 if (startTime == 0) { 347 throw new RuntimeException("Not started"); 348 } 349 350 Set<String> unmarked = new HashSet<>(); 351 ListContainerOptions options = ListContainerOptions.NONE; 352 for (;;) { 353 PageSet<? extends StorageMetadata> metadatas = binaryManager.blobStore.list(binaryManager.container, 354 options); 355 for (StorageMetadata metadata : metadatas) { 356 String digest = metadata.getName(); 357 if (!isMD5(digest)) { 358 // ignore files that cannot be MD5 digests for safety 359 continue; 360 } 361 // TODO size in metadata available only in upcoming JClouds 1.9.0 (JCLOUDS-654) 362 if (marked.contains(digest)) { 363 status.numBinaries++; 364 // status.sizeBinaries += size; 365 } else { 366 status.numBinariesGC++; 367 // status.sizeBinariesGC += size; 368 // record file to delete 369 unmarked.add(digest); 370 marked.remove(digest); // optimize memory 371 } 372 } 373 String marker = metadatas.getNextMarker(); 374 if (marker == null) { 375 break; 376 } 377 options = ListContainerOptions.Builder.afterMarker(marker); 378 } 379 marked = null; // help GC 380 381 // delete unmarked objects 382 if (delete) { 383 for (String digest : unmarked) { 384 binaryManager.removeBinary(digest); 385 } 386 } 387 388 status.gcDuration = System.currentTimeMillis() - startTime; 389 startTime = 0; 390 } 391 } 392 393}