001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.workmanager; 020 021import org.nuxeo.ecm.core.work.api.Work; 022import org.nuxeo.ecm.platform.importer.mqueues.computation.AbstractComputation; 023import org.nuxeo.ecm.platform.importer.mqueues.computation.ComputationContext; 024import org.nuxeo.ecm.platform.importer.mqueues.computation.Record; 025 026import java.io.ByteArrayInputStream; 027import java.io.ByteArrayOutputStream; 028import java.io.IOException; 029import java.io.ObjectInput; 030import java.io.ObjectInputStream; 031import java.io.ObjectOutput; 032import java.io.ObjectOutputStream; 033 034 035/** 036 * @since 9.2 037 */ 038public class ComputationWork extends AbstractComputation { 039 public ComputationWork(String name) { 040 super(name, 1, 0); 041 } 042 043 @Override 044 public void processRecord(ComputationContext context, String inputStreamName, Record record) { 045 Work work = deserialize(record.data); 046 try { 047 work.run(); 048 } finally { 049 // TODO catch error and propagate 050 work.cleanUp(true, null); 051 } 052 context.askForCheckpoint(); 053 } 054 055 @Override 056 public void init(ComputationContext context) { 057 super.init(context); 058 try { 059 // TODO: check if NXP-21969 provide a way to start computation only once framework is started 060 // until then wait to prevent conflict during startup 061 Thread.sleep(10000); 062 } catch (InterruptedException e) { 063 Thread.interrupted(); 064 throw new RuntimeException(e); 065 } 066 } 067 068 public static Work deserialize(byte[] data) { 069 ByteArrayInputStream bis = new ByteArrayInputStream(data); 070 ObjectInput in = null; 071 try { 072 in = new ObjectInputStream(bis); 073 return (Work) in.readObject(); 074 } catch (IOException | ClassNotFoundException e) { 075 System.out.println("Error " + e.getMessage()); 076 throw new RuntimeException(e); 077 } finally { 078 try { 079 if (in != null) { 080 in.close(); 081 } 082 } catch (IOException ex) { 083 // ignore close exception 084 } 085 } 086 } 087 088 public static byte[] serialize(Work work) { 089 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 090 ObjectOutput out; 091 try { 092 out = new ObjectOutputStream(bos); 093 out.writeObject(work); 094 out.flush(); 095 return bos.toByteArray(); 096 } catch (IOException e) { 097 System.out.println("Error " + e.getMessage()); 098 throw new RuntimeException(e); 099 } finally { 100 try { 101 bos.close(); 102 } catch (IOException ex) { 103 // ignore close exception 104 } 105 } 106 } 107}