001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Funsho David 018 */ 019package org.nuxeo.ecm.core.bulk.computation; 020 021import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.ABORTED; 022import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.COMPLETED; 023import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.UNKNOWN; 024 025import org.apache.logging.log4j.LogManager; 026import org.apache.logging.log4j.Logger; 027import org.nuxeo.ecm.core.bulk.BulkCodecs; 028import org.nuxeo.ecm.core.bulk.BulkService; 029import org.nuxeo.ecm.core.bulk.BulkServiceImpl; 030import org.nuxeo.ecm.core.bulk.message.BulkStatus; 031import org.nuxeo.lib.stream.codec.Codec; 032import org.nuxeo.lib.stream.computation.AbstractComputation; 033import org.nuxeo.lib.stream.computation.ComputationContext; 034import org.nuxeo.lib.stream.computation.Record; 035import org.nuxeo.runtime.api.Framework; 036 037/** 038 * Saves the status into a key value store. 039 * <p> 040 * Inputs: 041 * <ul> 042 * <li>i1: Reads {@link BulkStatus} sharded by command id</li> 043 * </ul> 044 * Outputs: 045 * <ul> 046 * <li>o1: Write {@link BulkStatus} full into the done stream.</li> 047 * </ul> 048 * </p> 049 * 050 * @since 10.2 051 */ 052public class BulkStatusComputation extends AbstractComputation { 053 054 private static final Logger log = LogManager.getLogger(BulkStatusComputation.class); 055 056 public BulkStatusComputation(String name) { 057 super(name, 1, 1); 058 } 059 060 @Override 061 public void processRecord(ComputationContext context, String inputStreamName, Record record) { 062 Codec<BulkStatus> codec = BulkCodecs.getStatusCodec(); 063 BulkStatus recordStatus = codec.decode(record.getData()); 064 BulkServiceImpl bulkService = (BulkServiceImpl) Framework.getService(BulkService.class); 065 BulkStatus status; 066 if (!recordStatus.isDelta()) { 067 status = recordStatus; 068 } else { 069 status = bulkService.getStatus(recordStatus.getId()); 070 if (UNKNOWN.equals(status.getState())) { 071 // this requires a manual intervention, the kv store might have been lost 072 log.error("Stopping processing, unknown status for command: {}, offset: {}, record: {}.", 073 recordStatus.getId(), context.getLastOffset(), record); 074 context.askForTermination(); 075 return; 076 } 077 status.merge(recordStatus); 078 } 079 byte[] statusAsBytes = bulkService.setStatus(status); 080 if (status.getState() == COMPLETED || recordStatus.getState() == ABORTED) { 081 context.produceRecord(OUTPUT_1, status.getId(), statusAsBytes); 082 } 083 context.askForCheckpoint(); 084 } 085}