001/*
002 * (C) Copyright 2019 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Nuxeo
018 */
019package org.nuxeo.ecm.core.storage.gcp;
020
021import static org.apache.commons.lang3.StringUtils.EMPTY;
022import static org.apache.commons.lang3.StringUtils.isBlank;
023import static org.apache.commons.lang3.StringUtils.isNotBlank;
024
025import java.io.BufferedInputStream;
026import java.io.File;
027import java.io.FileInputStream;
028import java.io.IOException;
029import java.nio.ByteBuffer;
030import java.nio.file.Files;
031import java.nio.file.Path;
032import java.util.Collection;
033import java.util.HashSet;
034import java.util.Set;
035import java.util.regex.Pattern;
036
037import org.apache.commons.io.IOUtils;
038import org.apache.logging.log4j.LogManager;
039import org.apache.logging.log4j.Logger;
040import org.nuxeo.common.Environment;
041import org.nuxeo.ecm.blob.AbstractBinaryGarbageCollector;
042import org.nuxeo.ecm.blob.AbstractCloudBinaryManager;
043import org.nuxeo.ecm.core.api.NuxeoException;
044import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
045import org.nuxeo.ecm.core.blob.binary.FileStorage;
046
047import com.google.api.gax.paging.Page;
048import com.google.auth.oauth2.GoogleCredentials;
049import com.google.cloud.storage.Blob;
050import com.google.cloud.storage.BlobId;
051import com.google.cloud.storage.BlobInfo;
052import com.google.cloud.storage.Bucket;
053import com.google.cloud.storage.BucketInfo;
054import com.google.cloud.storage.Storage;
055import com.google.cloud.storage.Storage.BlobField;
056import com.google.cloud.storage.Storage.BlobListOption;
057import com.google.cloud.storage.StorageOptions;
058
059/**
060 * A Binary Manager that stores binaries as Google Storage BLOBs
061 * <p>
062 * The BLOBs are cached locally on first access for efficiency.
063 * <p>
064 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file
065 * if accessed before the stream. Related to GCP credentials, here are the options:
066 * <ul>
067 * <li>nuxeo.gcp.credentials=/path/to/file.json</li>
068 * <li>nuxeo.gcp.credentials=file.json (located in nxserver/config)</li>
069 * <li>If nothing is set, Nuxeo will look into 'gcp-credentials.json' file by default (located in nxserver/config)</li>
070 * </ul>
071 *
072 * @since 10.10-HF12
073 */
074public class GoogleStorageBinaryManager extends AbstractCloudBinaryManager {
075
076    private static final Logger log = LogManager.getLogger(GoogleStorageBinaryManager.class);
077
078    public static final String BUCKET_NAME_PROPERTY = "storage.bucket";
079
080    public static final String BUCKET_PREFIX_PROPERTY = "storage.bucket_prefix";
081
082    /** @since 11.4 */
083    public static final String UPLOAD_CHUNK_SIZE_PROPERTY = "storage.upload.chunk.size";
084
085    /**
086     * Default is taken from {@link com.google.cloud.BaseWriteChannel}.
087     *
088     * @since 11.4
089     */
090    public static final int DEFAULT_UPLOAD_CHUNK_SIZE = 2048 * 1024; // 2 MB
091
092    public static final String PROJECT_ID_PROPERTY = "project";
093
094    public static final String GOOGLE_APPLICATION_CREDENTIALS = "credentials";
095
096    public static final String GOOGLE_PLATFORM_SCOPE = "https://www.googleapis.com/auth/cloud-platform";
097
098    public static final String GOOGLE_STORAGE_SCOPE = "https://www.googleapis.com/auth/devstorage.full_control";
099
100    public static final String SYSTEM_PROPERTY_PREFIX = "nuxeo.gcp";
101
102    private static final Pattern MD5_RE = Pattern.compile("[0-9a-f]{32}");
103
104    public static final String DELIMITER = "/";
105
106    public static final String GCP_JSON_FILE = "gcp-credentials.json";
107
108    protected String bucketName;
109
110    protected String bucketPrefix;
111
112    protected Bucket bucket;
113
114    protected Storage storage;
115
116    /** @since 11.4 */
117    protected int chunkSize;
118
119    @Override
120    protected void setupCloudClient() {
121        try {
122            String projectId = getProperty(PROJECT_ID_PROPERTY);
123
124            Path credentialsPath = Path.of(getProperty(GOOGLE_APPLICATION_CREDENTIALS, GCP_JSON_FILE));
125            if (!credentialsPath.isAbsolute()) {
126                credentialsPath = Environment.getDefault().getConfig().toPath().resolve(credentialsPath);
127            }
128            GoogleCredentials credentials = GoogleCredentials.fromStream(Files.newInputStream(credentialsPath))
129                                                             .createScoped(GOOGLE_PLATFORM_SCOPE, GOOGLE_STORAGE_SCOPE);
130            credentials.refreshIfExpired();
131
132            storage = StorageOptions.newBuilder()
133                                    .setCredentials(credentials)
134                                    .setProjectId(projectId)
135                                    .build()
136                                    .getService();
137            bucketName = getProperty(BUCKET_NAME_PROPERTY);
138            bucketPrefix = getProperty(BUCKET_PREFIX_PROPERTY, EMPTY);
139            bucket = getOrCreateBucket(bucketName);
140            chunkSize = getIntProperty(UPLOAD_CHUNK_SIZE_PROPERTY, DEFAULT_UPLOAD_CHUNK_SIZE);
141
142            if (!isBlank(bucketPrefix) && !bucketPrefix.endsWith(DELIMITER)) {
143                log.warn("Google bucket prefix ({}): {} should end with '/': added automatically.",
144                        BUCKET_PREFIX_PROPERTY, bucketPrefix);
145                bucketPrefix += DELIMITER;
146            }
147            if (isNotBlank(namespace)) {
148                // use namespace as an additional prefix
149                bucketPrefix += namespace;
150                if (!bucketPrefix.endsWith(DELIMITER)) {
151                    bucketPrefix += DELIMITER;
152                }
153            }
154        } catch (IOException e) {
155            throw new NuxeoException(e);
156        }
157    }
158
159    /**
160     * Gets or creates a bucket with the given {@code bucketName}.
161     *
162     * @return the bucket instance.
163     */
164    public Bucket getOrCreateBucket(String bucketName) {
165        Bucket bucket = storage.get(bucketName);
166        if (bucket == null) {
167            log.debug("Creating a new bucket: {}", bucketName);
168            return storage.create(BucketInfo.of(bucketName));
169        }
170        return bucket;
171    }
172
173    /**
174     * Deletes a bucket (and all its blobs) with the given {@code bucketName}.
175     *
176     * @return boolean if bucket has been deleted or not.
177     */
178    public boolean deleteBucket(String bucketName) {
179        Bucket bucket = storage.get(bucketName);
180        for (Blob blob : storage.list(bucketName).iterateAll()) {
181            blob.delete();
182        }
183        return bucket.exists() && storage.delete(bucketName);
184    }
185
186    public Bucket getBucket() {
187        return bucket;
188    }
189
190    @Override
191    protected FileStorage getFileStorage() {
192        return new GCPFileStorage();
193    }
194
195    public class GCPFileStorage implements FileStorage {
196
197        @Override
198        public void storeFile(String digest, File file) {
199            long t0 = System.currentTimeMillis();
200            log.debug("Storing blob with digest: {} to GCS", digest);
201            String key = bucketPrefix + digest;
202            // try to get the blob's metadata to check if it exists
203            if (bucket.get(key) == null) {
204                try (var is = new BufferedInputStream(new FileInputStream(file));
205                        var writer = storage.writer(BlobInfo.newBuilder(bucketName, key).build())) {
206                    int bufferLength;
207                    byte[] buffer = new byte[chunkSize];
208                    writer.setChunkSize(chunkSize);
209                    while ((bufferLength = IOUtils.read(is, buffer)) > 0) {
210                        writer.write(ByteBuffer.wrap(buffer, 0, bufferLength));
211                    }
212                } catch (IOException e) {
213                    throw new NuxeoException(e);
214                }
215                log.debug("Stored blob with digest: {} to GCS in {}ms", digest, System.currentTimeMillis() - t0);
216            } else {
217                log.debug("Blob with digest: {} is already in GCS", digest);
218            }
219        }
220
221        @Override
222        public boolean fetchFile(String key, File file) {
223            Blob blob = bucket.get(bucketPrefix + key);
224            if (blob != null) {
225                blob.downloadTo(file.toPath());
226                return true;
227            }
228            return false;
229        }
230    }
231
232    @Override
233    protected String getSystemPropertyPrefix() {
234        return SYSTEM_PROPERTY_PREFIX;
235    }
236
237    /**
238     * Garbage collector for GCP binaries that stores the marked (in use) binaries in memory.
239     */
240    public static class GoogleStorageBinaryGarbageCollector
241            extends AbstractBinaryGarbageCollector<GoogleStorageBinaryManager> {
242
243        protected GoogleStorageBinaryGarbageCollector(GoogleStorageBinaryManager binaryManager) {
244            super(binaryManager);
245        }
246
247        @Override
248        public String getId() {
249            return "gcs:" + binaryManager.bucketName;
250        }
251
252        @Override
253        public Set<String> getUnmarkedBlobs() {
254            Set<String> unmarked = new HashSet<>();
255            Page<Blob> blobs = binaryManager.getBucket()
256                                            .list(BlobListOption.fields(BlobField.ID, BlobField.SIZE),
257                                                    BlobListOption.prefix(binaryManager.bucketPrefix));
258            do {
259                int prefixLength = binaryManager.bucketPrefix.length();
260                for (Blob blob : blobs.iterateAll()) {
261                    String digest = blob.getName().substring(prefixLength);
262                    if (!isMD5(digest)) {
263                        // ignore files that cannot be MD5 digests for
264                        // safety
265                        continue;
266                    }
267                    if (marked.contains(digest)) {
268                        status.numBinaries++;
269                        status.sizeBinaries += blob.getSize();
270                    } else {
271                        status.numBinariesGC++;
272                        status.sizeBinariesGC += blob.getSize();
273                        unmarked.add(digest);
274                        marked.remove(digest);
275                    }
276                }
277                blobs = blobs.getNextPage();
278            } while (blobs != null);
279            return unmarked;
280        }
281    }
282
283    protected static boolean isMD5(String digest) {
284        return MD5_RE.matcher(digest).matches();
285    }
286
287    @Override
288    protected BinaryGarbageCollector instantiateGarbageCollector() {
289        return new GoogleStorageBinaryGarbageCollector(this);
290    }
291
292    @Override
293    public void removeBinaries(Collection<String> digests) {
294        digests.forEach(this::removeBinary);
295    }
296
297    protected void removeBinary(String digest) {
298        storage.delete(BlobId.of(bucket.getName(), bucketPrefix + digest));
299    }
300
301}