001/*
002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Funsho David
018 */
019package org.nuxeo.ecm.core.bulk.computation;
020
021import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.ABORTED;
022import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.COMPLETED;
023import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.UNKNOWN;
024
025import org.apache.logging.log4j.LogManager;
026import org.apache.logging.log4j.Logger;
027import org.nuxeo.ecm.core.bulk.BulkCodecs;
028import org.nuxeo.ecm.core.bulk.BulkService;
029import org.nuxeo.ecm.core.bulk.BulkServiceImpl;
030import org.nuxeo.ecm.core.bulk.message.BulkStatus;
031import org.nuxeo.lib.stream.codec.Codec;
032import org.nuxeo.lib.stream.computation.AbstractComputation;
033import org.nuxeo.lib.stream.computation.ComputationContext;
034import org.nuxeo.lib.stream.computation.Record;
035import org.nuxeo.runtime.api.Framework;
036
037/**
038 * Saves the status into a key value store.
039 * <p>
040 * Inputs:
041 * <ul>
042 * <li>i1: Reads {@link BulkStatus} sharded by command id</li>
043 * </ul>
044 * Outputs:
045 * <ul>
046 * <li>o1: Write {@link BulkStatus} full into the done stream.</li>
047 * </ul>
048 * </p>
049 *
050 * @since 10.2
051 */
052public class BulkStatusComputation extends AbstractComputation {
053
054    private static final Logger log = LogManager.getLogger(BulkStatusComputation.class);
055
056    public BulkStatusComputation(String name) {
057        super(name, 1, 1);
058    }
059
060    @Override
061    public void processRecord(ComputationContext context, String inputStreamName, Record record) {
062        Codec<BulkStatus> codec = BulkCodecs.getStatusCodec();
063        BulkStatus recordStatus = codec.decode(record.getData());
064        BulkServiceImpl bulkService = (BulkServiceImpl) Framework.getService(BulkService.class);
065        BulkStatus status;
066        if (!recordStatus.isDelta()) {
067            status = recordStatus;
068        } else {
069            status = bulkService.getStatus(recordStatus.getId());
070            if (UNKNOWN.equals(status.getState())) {
071                // this requires a manual intervention, the kv store might have been lost
072                log.error("Stopping processing, unknown status for command: {}, offset: {}, record: {}.",
073                        recordStatus.getId(), context.getLastOffset(), record);
074                context.askForTermination();
075                return;
076            }
077            status.merge(recordStatus);
078        }
079        byte[] statusAsBytes = bulkService.setStatus(status);
080        if (status.getState() == COMPLETED || recordStatus.getState() == ABORTED) {
081            context.produceRecord(OUTPUT_1, status.getId(), statusAsBytes);
082        }
083        context.askForCheckpoint();
084    }
085}