001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.importer.stream.consumer; 020 021import java.io.IOException; 022import java.nio.file.Files; 023import java.nio.file.Path; 024import java.nio.file.Paths; 025 026import org.apache.commons.lang3.StringUtils; 027import org.nuxeo.ecm.core.api.NuxeoException; 028import org.nuxeo.ecm.core.api.impl.blob.FileBlob; 029import org.nuxeo.importer.stream.consumer.watermarker.JpegWatermarker; 030import org.nuxeo.importer.stream.consumer.watermarker.Mp4Watermarker; 031import org.nuxeo.importer.stream.consumer.watermarker.TextWatermarker; 032import org.nuxeo.importer.stream.consumer.watermarker.Watermarker; 033import org.nuxeo.importer.stream.message.BlobMessage; 034 035/** 036 * Consume blobMessages and generate new blob by adding a unique watermark. 037 * 038 * @since 10.1 039 */ 040public class BlobWatermarkMessageConsumer extends BlobMessageConsumer { 041 042 protected final String prefix; 043 044 protected final String generatedBlobPath; 045 046 protected final Path outputPath; 047 048 protected final Watermarker textWatermarker = new TextWatermarker(); 049 050 protected final Watermarker jpegWatermarker = new JpegWatermarker(); 051 052 protected final Watermarker mp4Watermarker = new Mp4Watermarker(); 053 054 /** 055 * @param persistBlobPath when it is blank we don't delete generated blobs 056 */ 057 public BlobWatermarkMessageConsumer(String consumerId, String blobProviderName, BlobInfoWriter blobInfoWriter, 058 String watermarkPrefix, String persistBlobPath) { 059 super(consumerId, blobProviderName, blobInfoWriter); 060 this.prefix = watermarkPrefix; 061 this.generatedBlobPath = persistBlobPath; 062 if (StringUtils.isBlank(persistBlobPath)) { 063 try { 064 outputPath = Files.createTempDirectory("watermark"); 065 } catch (IOException e) { 066 throw new NuxeoException("Failed to create temp dir", e); 067 } 068 } else { 069 outputPath = Paths.get(persistBlobPath); 070 } 071 } 072 073 @Override 074 protected CloseableBlob getBlob(BlobMessage message) { 075 String watermark = getWatermarkString(); 076 switch (message.getMimetype()) { 077 case "text/plain": 078 return addWatermark(message, watermark, textWatermarker); 079 case "image/jpeg": 080 return addWatermark(message, watermark, jpegWatermarker); 081 case "video/mp4": 082 return addWatermark(message, watermark, mp4Watermarker); 083 default: 084 return super.getBlob(message); 085 } 086 } 087 088 protected CloseableBlob addWatermark(BlobMessage message, String watermark, Watermarker watermarker) { 089 Path output = watermarker.addWatermark(Paths.get(message.getPath()), outputPath, watermark); 090 Path fileToDelete = StringUtils.isBlank(generatedBlobPath) ? output : null; 091 return new CloseableBlob(new FileBlob(output.toFile(), message.getMimetype()), fileToDelete); 092 } 093 094 protected String getWatermarkString() { 095 return prefix + " " + System.currentTimeMillis(); 096 } 097}