001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Benoit Delbosc
018 */
019package org.nuxeo.importer.stream.automation;
020
021import static org.apache.commons.lang3.StringUtils.isEmpty;
022import static org.nuxeo.importer.stream.automation.BlobConsumers.DEFAULT_LOG_CONFIG;
023
024import java.io.File;
025import java.util.concurrent.ExecutionException;
026
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.nuxeo.ecm.automation.OperationContext;
030import org.nuxeo.ecm.automation.OperationException;
031import org.nuxeo.ecm.automation.core.Constants;
032import org.nuxeo.ecm.automation.core.annotations.Context;
033import org.nuxeo.ecm.automation.core.annotations.Operation;
034import org.nuxeo.ecm.automation.core.annotations.OperationMethod;
035import org.nuxeo.ecm.automation.core.annotations.Param;
036import org.nuxeo.ecm.core.api.NuxeoPrincipal;
037import org.nuxeo.importer.stream.message.BlobMessage;
038import org.nuxeo.importer.stream.producer.FileBlobMessageProducerFactory;
039import org.nuxeo.lib.stream.log.LogManager;
040import org.nuxeo.lib.stream.pattern.producer.ProducerPool;
041import org.nuxeo.runtime.api.Framework;
042import org.nuxeo.runtime.stream.StreamService;
043
044/**
045 * Create blob messages using a list of files.
046 *
047 * @since 10.2
048 */
049@Operation(id = FileBlobProducers.ID, category = Constants.CAT_SERVICES, label = "Produces blobs from a list of files", since = "10.2", description = "Produces blobs from a list of files.")
050public class FileBlobProducers {
051    private static final Log log = LogFactory.getLog(FileBlobProducers.class);
052
053    public static final String ID = "StreamImporter.runFileBlobProducers";
054
055    public static final String DEFAULT_BLOB_LOG_NAME = "import-blob";
056
057    @Context
058    protected OperationContext ctx;
059
060    @Param(name = "listFile")
061    protected String listFile;
062
063    @Param(name = "nbBlobs", required = false)
064    protected Integer nbBlobs;
065
066    @Param(name = "nbThreads", required = false)
067    protected Integer nbThreads = 1;
068
069    @Param(name = "logName", required = false)
070    protected String logName;
071
072    @Param(name = "logSize", required = false)
073    protected Integer logSize;
074
075    @Param(name = "logConfig", required = false)
076    protected String logConfig;
077
078    @Param(name = "basePath", required = false)
079    protected String basePath;
080
081    @OperationMethod
082    public void run() throws OperationException {
083        checkAccess(ctx);
084        StreamService service = Framework.getService(StreamService.class);
085        LogManager manager = service.getLogManager(getLogConfig());
086        try {
087            manager.createIfNotExists(getLogName(), getLogSize());
088            try (ProducerPool<BlobMessage> producers = new ProducerPool<>(getLogName(), manager,
089                    new FileBlobMessageProducerFactory(getListFile(), getBasePath(), getNbBlobs()),
090                    nbThreads.shortValue())) {
091                producers.start().get();
092            }
093        } catch (InterruptedException e) {
094            Thread.currentThread().interrupt();
095            log.warn("Operation interrupted");
096            throw new RuntimeException(e);
097        } catch (ExecutionException e) {
098            log.error("Operation fails", e);
099            throw new OperationException(e);
100        }
101    }
102
103    protected long getNbBlobs() {
104        if (nbBlobs == null) {
105            // this means all the files listed in the files
106            return 0;
107        }
108        return nbBlobs.longValue();
109    }
110
111    protected int getLogSize() {
112        if (logSize != null && logSize > 0) {
113            return logSize;
114        }
115        return nbThreads;
116    }
117
118    protected String getBasePath() {
119        if (isEmpty(basePath)) {
120            return null;
121        }
122        if (!new File(basePath).exists()) {
123            throw new IllegalArgumentException("Can not access basePath: " + basePath);
124        }
125        return basePath;
126    }
127
128    protected File getListFile() {
129        File ret = new File(listFile);
130        if (!ret.exists() || !ret.canRead()) {
131            throw new IllegalArgumentException("Can not access or read listFile: " + listFile);
132        }
133        return ret;
134    }
135
136    protected String getLogConfig() {
137        if (logConfig != null) {
138            return logConfig;
139        }
140        return DEFAULT_LOG_CONFIG;
141    }
142
143    protected String getLogName() {
144        if (logName != null) {
145            return logName;
146        }
147        return DEFAULT_BLOB_LOG_NAME;
148    }
149
150    protected static void checkAccess(OperationContext context) {
151        NuxeoPrincipal principal = context.getPrincipal();
152        if (principal == null || !principal.isAdministrator()) {
153            throw new RuntimeException("Unauthorized access: " + principal);
154        }
155    }
156}