001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * pierre 018 */ 019package org.nuxeo.ecm.core.bulk.action.computation; 020 021import java.io.Serializable; 022import java.time.Instant; 023import java.util.Collections; 024import java.util.Map; 025 026import org.nuxeo.ecm.core.api.Blob; 027import org.nuxeo.ecm.core.bulk.BulkCodecs; 028import org.nuxeo.ecm.core.bulk.BulkService; 029import org.nuxeo.ecm.core.bulk.message.BulkStatus; 030import org.nuxeo.ecm.core.bulk.message.DataBucket; 031import org.nuxeo.ecm.core.io.download.DownloadService; 032import org.nuxeo.ecm.core.transientstore.api.TransientStore; 033import org.nuxeo.ecm.core.transientstore.api.TransientStoreService; 034import org.nuxeo.lib.stream.codec.Codec; 035import org.nuxeo.lib.stream.computation.ComputationContext; 036import org.nuxeo.lib.stream.computation.Record; 037import org.nuxeo.runtime.api.Framework; 038 039/** 040 * @since 10.3 041 */ 042public class ExposeBlob extends AbstractTransientBlobComputation { 043 044 public static final String NAME = "exposeBlob"; 045 046 public ExposeBlob() { 047 super(NAME); 048 } 049 050 @Override 051 public void processRecord(ComputationContext context, String documentIdsStreamName, Record record) { 052 Codec<DataBucket> codec = BulkCodecs.getDataBucketCodec(); 053 DataBucket in = codec.decode(record.getData()); 054 String commandId = in.getCommandId(); 055 long documents = in.getCount(); 056 057 String storeName = Framework.getService(BulkService.class).getStatus(commandId).getAction(); 058 Blob blob = getBlob(in.getDataAsString(), storeName); 059 // store it in download transient store 060 TransientStore store = Framework.getService(TransientStoreService.class) 061 .getStore(DownloadService.TRANSIENT_STORE_STORE_NAME); 062 store.putBlobs(commandId, Collections.singletonList(blob)); 063 store.setCompleted(commandId, true); 064 065 // update the command status 066 BulkStatus delta = BulkStatus.deltaOf(commandId); 067 delta.setProcessed(documents); 068 String url = Framework.getService(DownloadService.class).getDownloadUrl(commandId); 069 Map<String, Serializable> result = Collections.singletonMap("url", url); 070 delta.setResult(result); 071 AbstractBulkComputation.updateStatus(context, delta); 072 context.askForCheckpoint(); 073 } 074 075}