001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.importer.stream.consumer; 020 021import static java.nio.charset.StandardCharsets.UTF_8; 022 023import java.io.IOException; 024import java.nio.file.Paths; 025import java.util.Arrays; 026 027import org.apache.commons.lang3.StringUtils; 028import org.nuxeo.common.utils.URIUtils; 029import org.nuxeo.ecm.core.api.NuxeoException; 030import org.nuxeo.ecm.core.blob.BlobInfo; 031import org.nuxeo.ecm.core.redis.RedisAdmin; 032import org.nuxeo.ecm.core.redis.RedisExecutor; 033import org.nuxeo.importer.stream.message.DocumentMessage; 034import org.nuxeo.lib.stream.pattern.consumer.AbstractConsumer; 035import org.nuxeo.runtime.api.Framework; 036 037import com.fasterxml.jackson.core.JsonProcessingException; 038import com.fasterxml.jackson.databind.JsonNode; 039import com.fasterxml.jackson.databind.ObjectMapper; 040import com.fasterxml.jackson.databind.node.ObjectNode; 041 042/** 043 * Consumes DocumentMessage and send them to Redis which can be used as Gatling feeder. 044 * 045 * @since 10.2 046 */ 047public class RedisDocumentMessageConsumer extends AbstractConsumer<DocumentMessage> { 048 049 protected static final String DEFAULT_REDIS_PREFIX = "imp"; 050 051 protected static final String DOC_KEY_SUFFIX = "doc"; 052 053 protected static final String DATA_KEY_SUFFIX = "data"; 054 055 protected static final String FOLDER_KEY_SUFFIX = "folder"; 056 057 protected static final String BATCH_ID_TAG = "_BATCH_ID_"; 058 059 protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); 060 061 protected final byte[] addDocumentSHA; 062 063 protected final RedisExecutor redisExecutor; 064 065 protected final String redisPrefix; 066 067 public RedisDocumentMessageConsumer(String consumerId, String redisPrefix) { 068 super(consumerId); 069 if (StringUtils.isBlank(redisPrefix)) { 070 this.redisPrefix = DEFAULT_REDIS_PREFIX; 071 } else { 072 this.redisPrefix = redisPrefix; 073 } 074 RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class); 075 try { 076 addDocumentSHA = bytes(redisAdmin.load("org.nuxeo.importer.stream", "add-document")); 077 } catch (IOException e) { 078 throw new NuxeoException("Cannot load Redis script", e); 079 } 080 redisExecutor = Framework.getService(RedisExecutor.class); 081 } 082 083 @Override 084 public void accept(DocumentMessage message) { 085 BlobInfo blobInfo = message.getBlobInfo(); 086 String blobPath = ""; 087 String blobFilename = ""; 088 String blobMimeType = ""; 089 String properties; 090 try { 091 if (blobInfo == null || StringUtils.isBlank(blobInfo.filename)) { 092 properties = getProperties(message); 093 } else { 094 blobPath = blobInfo.filename; 095 blobMimeType = blobInfo.mimeType; 096 blobFilename = Paths.get(blobInfo.filename).getFileName().toString(); 097 properties = getPropertiesWithBlob(message); 098 } 099 String parentPath = message.getParentPath(); 100 if (parentPath.startsWith("/")) { 101 parentPath = parentPath.substring(1); 102 } 103 String key = Paths.get(parentPath, message.getName()).toString(); 104 String docKey = redisPrefix + ":" + DOC_KEY_SUFFIX; 105 String folderKey = redisPrefix + ":" + FOLDER_KEY_SUFFIX; 106 String dataKey = redisPrefix + ":" + DATA_KEY_SUFFIX + ":" + key; 107 String level = String.valueOf(key.split("/").length); 108 String url = URIUtils.quoteURIPathComponent(key, false); 109 String payload = getPayload(message, properties); 110 redisExecutor.evalsha(addDocumentSHA, Arrays.asList(bytes(docKey), bytes(dataKey), bytes(folderKey)), 111 Arrays.asList(bytes(key), bytes(parentPath), bytes(message.getType()), bytes(message.getName()), 112 bytes(payload), bytes(url), bytes(level), bytes(blobPath), bytes(blobFilename), 113 bytes(blobMimeType))); 114 } catch (JsonProcessingException e) { 115 throw new NuxeoException("Cannot convert properties to json: " + message, e); 116 } 117 } 118 119 protected String getPayload(DocumentMessage message, String properties) { 120 return String.format("{\"entity-type\": \"document\", \"name\": \"%s\", \"type\": \"%s\", \"properties\": %s}", 121 message.getName(), message.getType(), properties); 122 } 123 124 protected String getPropertiesWithBlob(DocumentMessage message) { 125 JsonNode node = OBJECT_MAPPER.valueToTree(message.getProperties()); 126 ObjectNode fileContent = OBJECT_MAPPER.createObjectNode(); 127 // use a place holder for the batch id 128 fileContent.put("upload-batch", BATCH_ID_TAG); 129 fileContent.put("upload-fileId", "0"); 130 return ((ObjectNode) node).set("file:content", fileContent).toString(); 131 } 132 133 protected String getProperties(DocumentMessage message) throws JsonProcessingException { 134 return OBJECT_MAPPER.writeValueAsString(message.getProperties()); 135 } 136 137 protected byte[] bytes(String val) { 138 return val.getBytes(UTF_8); 139 } 140 141 @Override 142 public void begin() { 143 // no batching 144 } 145 146 @Override 147 public void commit() { 148 // no batching 149 } 150 151 @Override 152 public void rollback() { 153 // no batching 154 } 155 156}