001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * pierre 018 */ 019package org.nuxeo.ecm.core.bulk.action.computation; 020 021import java.io.Serializable; 022import java.util.Collections; 023import java.util.Map; 024 025import org.nuxeo.ecm.core.api.Blob; 026import org.nuxeo.ecm.core.bulk.BulkCodecs; 027import org.nuxeo.ecm.core.bulk.BulkService; 028import org.nuxeo.ecm.core.bulk.message.BulkStatus; 029import org.nuxeo.ecm.core.bulk.message.DataBucket; 030import org.nuxeo.ecm.core.io.download.DownloadService; 031import org.nuxeo.ecm.core.transientstore.api.TransientStore; 032import org.nuxeo.ecm.core.transientstore.api.TransientStoreService; 033import org.nuxeo.lib.stream.codec.Codec; 034import org.nuxeo.lib.stream.computation.ComputationContext; 035import org.nuxeo.lib.stream.computation.Record; 036import org.nuxeo.runtime.api.Framework; 037 038/** 039 * @since 10.3 040 */ 041public class ExposeBlob extends AbstractTransientBlobComputation { 042 043 public static final String NAME = "bulk/exposeBlob"; 044 045 public ExposeBlob() { 046 super(NAME); 047 } 048 049 @Override 050 public void processRecord(ComputationContext context, String documentIdsStreamName, Record record) { 051 Codec<DataBucket> codec = BulkCodecs.getDataBucketCodec(); 052 DataBucket in = codec.decode(record.getData()); 053 String commandId = in.getCommandId(); 054 long documents = in.getCount(); 055 056 String storeName = Framework.getService(BulkService.class).getStatus(commandId).getAction(); 057 Blob blob = getBlob(in.getDataAsString(), storeName); 058 // store it in download transient store 059 TransientStore store = Framework.getService(TransientStoreService.class) 060 .getStore(DownloadService.TRANSIENT_STORE_STORE_NAME); 061 store.putBlobs(commandId, Collections.singletonList(blob)); 062 store.setCompleted(commandId, true); 063 064 // update the command status 065 BulkStatus delta = BulkStatus.deltaOf(commandId); 066 delta.setProcessed(documents); 067 String url = Framework.getService(DownloadService.class).getDownloadUrl(commandId); 068 Map<String, Serializable> result = Collections.singletonMap("url", url); 069 delta.setResult(result); 070 AbstractBulkComputation.updateStatus(context, delta); 071 context.askForCheckpoint(); 072 } 073 074}