001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Funsho David 018 */ 019package org.nuxeo.ecm.core.bulk.computation; 020 021import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.ABORTED; 022import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.COMPLETED; 023import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.UNKNOWN; 024 025import org.nuxeo.ecm.core.bulk.BulkCodecs; 026import org.nuxeo.ecm.core.bulk.BulkService; 027import org.nuxeo.ecm.core.bulk.BulkServiceImpl; 028import org.nuxeo.ecm.core.bulk.message.BulkStatus; 029import org.nuxeo.lib.stream.codec.Codec; 030import org.nuxeo.lib.stream.computation.AbstractComputation; 031import org.nuxeo.lib.stream.computation.ComputationContext; 032import org.nuxeo.lib.stream.computation.Record; 033import org.nuxeo.runtime.api.Framework; 034 035/** 036 * Saves the status into a key value store. 037 * <p> 038 * Inputs: 039 * <ul> 040 * <li>i1: Reads {@link BulkStatus} sharded by command id</li> 041 * </ul> 042 * Outputs: 043 * <ul> 044 * <li>o1: Write {@link BulkStatus} full into the done stream.</li> 045 * </ul> 046 * 047 * @since 10.2 048 */ 049public class BulkStatusComputation extends AbstractComputation { 050 051 public BulkStatusComputation(String name) { 052 super(name, 1, 1); 053 } 054 055 @Override 056 public void processRecord(ComputationContext context, String inputStreamName, Record record) { 057 Codec<BulkStatus> codec = BulkCodecs.getStatusCodec(); 058 BulkStatus recordStatus = codec.decode(record.getData()); 059 BulkServiceImpl bulkService = (BulkServiceImpl) Framework.getService(BulkService.class); 060 BulkStatus status; 061 if (!recordStatus.isDelta()) { 062 status = recordStatus; 063 } else { 064 status = bulkService.getStatus(recordStatus.getId()); 065 if (UNKNOWN.equals(status.getState())) { 066 // this requires a manual intervention, the kv store might have been lost 067 throw new IllegalStateException( 068 String.format("Status with unknown command: %s, offset: %s, record: %s.", recordStatus.getId(), 069 context.getLastOffset(), record)); 070 } 071 status.merge(recordStatus); 072 } 073 byte[] statusAsBytes = bulkService.setStatus(status); 074 if (status.getState() == COMPLETED || recordStatus.getState() == ABORTED) { 075 context.produceRecord(OUTPUT_1, status.getId(), statusAsBytes); 076 } 077 context.askForCheckpoint(); 078 } 079}