001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.importer.stream.consumer;
020
021import static java.nio.charset.StandardCharsets.UTF_8;
022
023import java.io.IOException;
024import java.nio.file.Paths;
025import java.util.Arrays;
026
027import org.apache.commons.lang3.StringUtils;
028import org.nuxeo.common.utils.URIUtils;
029import org.nuxeo.ecm.core.api.NuxeoException;
030import org.nuxeo.ecm.core.blob.BlobInfo;
031import org.nuxeo.ecm.core.redis.RedisAdmin;
032import org.nuxeo.ecm.core.redis.RedisExecutor;
033import org.nuxeo.importer.stream.message.DocumentMessage;
034import org.nuxeo.lib.stream.pattern.consumer.AbstractConsumer;
035import org.nuxeo.runtime.api.Framework;
036
037import com.fasterxml.jackson.core.JsonProcessingException;
038import com.fasterxml.jackson.databind.JsonNode;
039import com.fasterxml.jackson.databind.ObjectMapper;
040import com.fasterxml.jackson.databind.node.ObjectNode;
041
042/**
043 * Consumes DocumentMessage and send them to Redis which can be used as Gatling feeder.
044 *
045 * @since 10.2
046 */
047public class RedisDocumentMessageConsumer extends AbstractConsumer<DocumentMessage> {
048
049    protected static final String DEFAULT_REDIS_PREFIX = "imp";
050
051    protected static final String DOC_KEY_SUFFIX = "doc";
052
053    protected static final String DATA_KEY_SUFFIX = "data";
054
055    protected static final String FOLDER_KEY_SUFFIX = "folder";
056
057    protected static final String BATCH_ID_TAG = "_BATCH_ID_";
058
059    protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
060
061    protected final byte[] addDocumentSHA;
062
063    protected final RedisExecutor redisExecutor;
064
065    protected final String redisPrefix;
066
067    public RedisDocumentMessageConsumer(String consumerId, String redisPrefix) {
068        super(consumerId);
069        if (StringUtils.isBlank(redisPrefix)) {
070            this.redisPrefix = DEFAULT_REDIS_PREFIX;
071        } else {
072            this.redisPrefix = redisPrefix;
073        }
074        RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class);
075        try {
076            addDocumentSHA = bytes(redisAdmin.load("org.nuxeo.importer.stream", "add-document"));
077        } catch (IOException e) {
078            throw new NuxeoException("Cannot load Redis script", e);
079        }
080        redisExecutor = Framework.getService(RedisExecutor.class);
081    }
082
083    @Override
084    public void accept(DocumentMessage message) {
085        BlobInfo blobInfo = message.getBlobInfo();
086        String blobPath = "";
087        String blobFilename = "";
088        String blobMimeType = "";
089        String properties;
090        try {
091            if (blobInfo == null || StringUtils.isBlank(blobInfo.filename)) {
092                properties = getProperties(message);
093            } else {
094                blobPath = blobInfo.filename;
095                blobMimeType = blobInfo.mimeType;
096                blobFilename = Paths.get(blobInfo.filename).getFileName().toString();
097                properties = getPropertiesWithBlob(message);
098            }
099            String parentPath = message.getParentPath();
100            if (parentPath.startsWith("/")) {
101                parentPath = parentPath.substring(1);
102            }
103            String key = Paths.get(parentPath, message.getName()).toString();
104            String docKey = redisPrefix + ":" + DOC_KEY_SUFFIX;
105            String folderKey = redisPrefix + ":" + FOLDER_KEY_SUFFIX;
106            String dataKey = redisPrefix + ":" + DATA_KEY_SUFFIX + ":" + key;
107            String level = String.valueOf(key.split("/").length);
108            String url = URIUtils.quoteURIPathComponent(key, false);
109            String payload = getPayload(message, properties);
110            redisExecutor.evalsha(addDocumentSHA, Arrays.asList(bytes(docKey), bytes(dataKey), bytes(folderKey)),
111                    Arrays.asList(bytes(key), bytes(parentPath), bytes(message.getType()), bytes(message.getName()),
112                            bytes(payload), bytes(url), bytes(level), bytes(blobPath), bytes(blobFilename),
113                            bytes(blobMimeType)));
114        } catch (JsonProcessingException e) {
115            throw new NuxeoException("Cannot convert properties to json: " + message, e);
116        }
117    }
118
119    protected String getPayload(DocumentMessage message, String properties) {
120        return String.format("{\"entity-type\": \"document\", \"name\": \"%s\", \"type\": \"%s\", \"properties\": %s}",
121                message.getName(), message.getType(), properties);
122    }
123
124    protected String getPropertiesWithBlob(DocumentMessage message) {
125        JsonNode node = OBJECT_MAPPER.valueToTree(message.getProperties());
126        ObjectNode fileContent = OBJECT_MAPPER.createObjectNode();
127        // use a place holder for the batch id
128        fileContent.put("upload-batch", BATCH_ID_TAG);
129        fileContent.put("upload-fileId", "0");
130        return ((ObjectNode) node).set("file:content", fileContent).toString();
131    }
132
133    protected String getProperties(DocumentMessage message) throws JsonProcessingException {
134        return OBJECT_MAPPER.writeValueAsString(message.getProperties());
135    }
136
137    protected byte[] bytes(String val) {
138        return val.getBytes(UTF_8);
139    }
140
141    @Override
142    public void begin() {
143        // no batching
144    }
145
146    @Override
147    public void commit() {
148        // no batching
149    }
150
151    @Override
152    public void rollback() {
153        // no batching
154    }
155
156}