001/* 002 * (C) Copyright 2019 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nuxeo 018 */ 019package org.nuxeo.ecm.core.storage.gcp; 020 021import static org.apache.commons.lang3.StringUtils.EMPTY; 022import static org.apache.commons.lang3.StringUtils.isBlank; 023import static org.apache.commons.lang3.StringUtils.isNotBlank; 024 025import java.io.BufferedInputStream; 026import java.io.File; 027import java.io.FileInputStream; 028import java.io.IOException; 029import java.nio.ByteBuffer; 030import java.nio.file.Files; 031import java.nio.file.Path; 032import java.util.Collection; 033import java.util.HashSet; 034import java.util.Set; 035import java.util.regex.Pattern; 036 037import org.apache.commons.io.IOUtils; 038import org.apache.logging.log4j.LogManager; 039import org.apache.logging.log4j.Logger; 040import org.nuxeo.common.Environment; 041import org.nuxeo.ecm.blob.AbstractBinaryGarbageCollector; 042import org.nuxeo.ecm.blob.AbstractCloudBinaryManager; 043import org.nuxeo.ecm.core.api.NuxeoException; 044import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 045import org.nuxeo.ecm.core.blob.binary.FileStorage; 046 047import com.google.api.gax.paging.Page; 048import com.google.auth.oauth2.GoogleCredentials; 049import com.google.cloud.storage.Blob; 050import com.google.cloud.storage.BlobId; 051import com.google.cloud.storage.BlobInfo; 052import com.google.cloud.storage.Bucket; 053import com.google.cloud.storage.BucketInfo; 054import com.google.cloud.storage.Storage; 055import com.google.cloud.storage.Storage.BlobField; 056import com.google.cloud.storage.Storage.BlobListOption; 057import com.google.cloud.storage.StorageOptions; 058 059/** 060 * A Binary Manager that stores binaries as Google Storage BLOBs 061 * <p> 062 * The BLOBs are cached locally on first access for efficiency. 063 * <p> 064 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file 065 * if accessed before the stream. Related to GCP credentials, here are the options: 066 * <ul> 067 * <li>nuxeo.gcp.credentials=/path/to/file.json</li> 068 * <li>nuxeo.gcp.credentials=file.json (located in nxserver/config)</li> 069 * <li>If nothing is set, Nuxeo will look into 'gcp-credentials.json' file by default (located in nxserver/config)</li> 070 * </ul> 071 * 072 * @since 10.10-HF12 073 */ 074public class GoogleStorageBinaryManager extends AbstractCloudBinaryManager { 075 076 private static final Logger log = LogManager.getLogger(GoogleStorageBinaryManager.class); 077 078 public static final String BUCKET_NAME_PROPERTY = "storage.bucket"; 079 080 public static final String BUCKET_PREFIX_PROPERTY = "storage.bucket_prefix"; 081 082 /** @since 11.4 */ 083 public static final String UPLOAD_CHUNK_SIZE_PROPERTY = "storage.upload.chunk.size"; 084 085 /** 086 * Default is taken from {@link com.google.cloud.BaseWriteChannel}. 087 * 088 * @since 11.4 089 */ 090 public static final int DEFAULT_UPLOAD_CHUNK_SIZE = 2048 * 1024; // 2 MB 091 092 public static final String PROJECT_ID_PROPERTY = "project"; 093 094 public static final String GOOGLE_APPLICATION_CREDENTIALS = "credentials"; 095 096 public static final String GOOGLE_PLATFORM_SCOPE = "https://www.googleapis.com/auth/cloud-platform"; 097 098 public static final String GOOGLE_STORAGE_SCOPE = "https://www.googleapis.com/auth/devstorage.full_control"; 099 100 public static final String SYSTEM_PROPERTY_PREFIX = "nuxeo.gcp"; 101 102 private static final Pattern MD5_RE = Pattern.compile("[0-9a-f]{32}"); 103 104 public static final String DELIMITER = "/"; 105 106 public static final String GCP_JSON_FILE = "gcp-credentials.json"; 107 108 protected String bucketName; 109 110 protected String bucketPrefix; 111 112 protected Bucket bucket; 113 114 protected Storage storage; 115 116 /** @since 11.4 */ 117 protected int chunkSize; 118 119 @Override 120 protected void setupCloudClient() { 121 try { 122 String projectId = getProperty(PROJECT_ID_PROPERTY); 123 124 Path credentialsPath = Path.of(getProperty(GOOGLE_APPLICATION_CREDENTIALS, GCP_JSON_FILE)); 125 if (!credentialsPath.isAbsolute()) { 126 credentialsPath = Environment.getDefault().getConfig().toPath().resolve(credentialsPath); 127 } 128 GoogleCredentials credentials = GoogleCredentials.fromStream(Files.newInputStream(credentialsPath)) 129 .createScoped(GOOGLE_PLATFORM_SCOPE, GOOGLE_STORAGE_SCOPE); 130 credentials.refreshIfExpired(); 131 132 storage = StorageOptions.newBuilder() 133 .setCredentials(credentials) 134 .setProjectId(projectId) 135 .build() 136 .getService(); 137 bucketName = getProperty(BUCKET_NAME_PROPERTY); 138 bucketPrefix = getProperty(BUCKET_PREFIX_PROPERTY, EMPTY); 139 bucket = getOrCreateBucket(bucketName); 140 chunkSize = getIntProperty(UPLOAD_CHUNK_SIZE_PROPERTY, DEFAULT_UPLOAD_CHUNK_SIZE); 141 142 if (!isBlank(bucketPrefix) && !bucketPrefix.endsWith(DELIMITER)) { 143 log.warn("Google bucket prefix ({}): {} should end with '/': added automatically.", 144 BUCKET_PREFIX_PROPERTY, bucketPrefix); 145 bucketPrefix += DELIMITER; 146 } 147 if (isNotBlank(namespace)) { 148 // use namespace as an additional prefix 149 bucketPrefix += namespace; 150 if (!bucketPrefix.endsWith(DELIMITER)) { 151 bucketPrefix += DELIMITER; 152 } 153 } 154 } catch (IOException e) { 155 throw new NuxeoException(e); 156 } 157 } 158 159 /** 160 * Gets or creates a bucket with the given {@code bucketName}. 161 * 162 * @return the bucket instance. 163 */ 164 public Bucket getOrCreateBucket(String bucketName) { 165 Bucket bucket = storage.get(bucketName); 166 if (bucket == null) { 167 log.debug("Creating a new bucket: {}", bucketName); 168 return storage.create(BucketInfo.of(bucketName)); 169 } 170 return bucket; 171 } 172 173 /** 174 * Deletes a bucket (and all its blobs) with the given {@code bucketName}. 175 * 176 * @return boolean if bucket has been deleted or not. 177 */ 178 public boolean deleteBucket(String bucketName) { 179 Bucket bucket = storage.get(bucketName); 180 for (Blob blob : storage.list(bucketName).iterateAll()) { 181 blob.delete(); 182 } 183 return bucket.exists() && storage.delete(bucketName); 184 } 185 186 public Bucket getBucket() { 187 return bucket; 188 } 189 190 @Override 191 protected FileStorage getFileStorage() { 192 return new GCPFileStorage(); 193 } 194 195 public class GCPFileStorage implements FileStorage { 196 197 @Override 198 public void storeFile(String digest, File file) { 199 long t0 = System.currentTimeMillis(); 200 log.debug("Storing blob with digest: {} to GCS", digest); 201 String key = bucketPrefix + digest; 202 // try to get the blob's metadata to check if it exists 203 if (bucket.get(key) == null) { 204 try (var is = new BufferedInputStream(new FileInputStream(file)); 205 var writer = storage.writer(BlobInfo.newBuilder(bucketName, key).build())) { 206 int bufferLength; 207 byte[] buffer = new byte[chunkSize]; 208 writer.setChunkSize(chunkSize); 209 while ((bufferLength = IOUtils.read(is, buffer)) > 0) { 210 writer.write(ByteBuffer.wrap(buffer, 0, bufferLength)); 211 } 212 } catch (IOException e) { 213 throw new NuxeoException(e); 214 } 215 log.debug("Stored blob with digest: {} to GCS in {}ms", digest, System.currentTimeMillis() - t0); 216 } else { 217 log.debug("Blob with digest: {} is already in GCS", digest); 218 } 219 } 220 221 @Override 222 public boolean fetchFile(String key, File file) { 223 Blob blob = bucket.get(bucketPrefix + key); 224 if (blob != null) { 225 blob.downloadTo(file.toPath()); 226 return true; 227 } 228 return false; 229 } 230 } 231 232 @Override 233 protected String getSystemPropertyPrefix() { 234 return SYSTEM_PROPERTY_PREFIX; 235 } 236 237 /** 238 * Garbage collector for GCP binaries that stores the marked (in use) binaries in memory. 239 */ 240 public static class GoogleStorageBinaryGarbageCollector 241 extends AbstractBinaryGarbageCollector<GoogleStorageBinaryManager> { 242 243 protected GoogleStorageBinaryGarbageCollector(GoogleStorageBinaryManager binaryManager) { 244 super(binaryManager); 245 } 246 247 @Override 248 public String getId() { 249 return "gcs:" + binaryManager.bucketName; 250 } 251 252 @Override 253 public Set<String> getUnmarkedBlobs() { 254 Set<String> unmarked = new HashSet<>(); 255 Page<Blob> blobs = binaryManager.getBucket() 256 .list(BlobListOption.fields(BlobField.ID, BlobField.SIZE), 257 BlobListOption.prefix(binaryManager.bucketPrefix)); 258 do { 259 int prefixLength = binaryManager.bucketPrefix.length(); 260 for (Blob blob : blobs.iterateAll()) { 261 String digest = blob.getName().substring(prefixLength); 262 if (!isMD5(digest)) { 263 // ignore files that cannot be MD5 digests for 264 // safety 265 continue; 266 } 267 if (marked.contains(digest)) { 268 status.numBinaries++; 269 status.sizeBinaries += blob.getSize(); 270 } else { 271 status.numBinariesGC++; 272 status.sizeBinariesGC += blob.getSize(); 273 unmarked.add(digest); 274 marked.remove(digest); 275 } 276 } 277 blobs = blobs.getNextPage(); 278 } while (blobs != null); 279 return unmarked; 280 } 281 } 282 283 protected static boolean isMD5(String digest) { 284 return MD5_RE.matcher(digest).matches(); 285 } 286 287 @Override 288 protected BinaryGarbageCollector instantiateGarbageCollector() { 289 return new GoogleStorageBinaryGarbageCollector(this); 290 } 291 292 @Override 293 public void removeBinaries(Collection<String> digests) { 294 digests.forEach(this::removeBinary); 295 } 296 297 protected void removeBinary(String digest) { 298 storage.delete(BlobId.of(bucket.getName(), bucketPrefix + digest)); 299 } 300 301}