001/* 002 * (C) Copyright 2012-2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.work; 021 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.Objects; 026import java.util.Optional; 027 028import org.nuxeo.ecm.core.work.api.Work; 029import org.nuxeo.ecm.core.work.api.Work.State; 030import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 031import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 032 033/** 034 * Implementation of a {@link WorkQueuing} using in-memory queuing. 035 * 036 * @since 5.8 037 */ 038public class MemoryWorkQueuing implements WorkQueuing { 039 040 protected final Map<String, MemoryBlockingQueue> allQueued = new HashMap<>(); 041 042 protected Listener listener; 043 044 public MemoryWorkQueuing(Listener listener) { 045 this.listener = listener; 046 } 047 048 @Override 049 public MemoryBlockingQueue init(WorkQueueDescriptor config) { 050 int capacity = config.getCapacity(); 051 if (capacity <= 0) { 052 capacity = -1; // unbounded 053 } 054 MemoryBlockingQueue queue = new MemoryBlockingQueue(config.id, this, capacity); 055 allQueued.put(queue.queueId, queue); 056 return queue; 057 } 058 059 @Override 060 public MemoryBlockingQueue getQueue(String queueId) { 061 return allQueued.get(queueId); 062 } 063 064 @Override 065 public void workSchedule(String queueId, Work work) { 066 listener.queueChanged(work, getQueue(queueId).workSchedule(work)); 067 } 068 069 @Override 070 public void workCanceled(String queueId, Work work) { 071 listener.queueChanged(work, getQueue(queueId).workCanceled(work)); 072 } 073 074 @Override 075 public void workRunning(String queueId, Work work) { 076 listener.queueChanged(work, getQueue(queueId).workRunning(work)); 077 } 078 079 @Override 080 public void workCompleted(String queueId, Work work) { 081 listener.queueChanged(work, getQueue(queueId).workCompleted(work)); 082 } 083 084 @Override 085 public void workReschedule(String queueId, Work work) { 086 listener.queueChanged(work, getQueue(queueId).workRescheduleRunning(work)); 087 } 088 089 Optional<Work> lookup(String workId) { 090 return allQueued.values().stream().map(queue -> queue.lookup(workId)).filter(Objects::nonNull).findAny(); 091 } 092 093 @Override 094 public Work find(String workId, State state) { 095 return lookup(workId).filter(work -> workHasState(work, state)).orElse(null); 096 } 097 098 @Override 099 public boolean isWorkInState(String workId, State state) { 100 return lookup(workId).filter(work -> workHasState(work, state)).isPresent(); 101 } 102 103 @Override 104 public State getWorkState(String workId) { 105 return lookup(workId).map(Work::getWorkInstanceState).orElse(null); 106 } 107 108 @Override 109 public List<Work> listWork(String queueId, State state) { 110 MemoryBlockingQueue queue = getQueue(queueId); 111 if (state == null) { 112 return queue.list(); 113 } 114 switch (state) { 115 case SCHEDULED: 116 return queue.listScheduled(); 117 case RUNNING: 118 return queue.listRunning(); 119 default: 120 throw new IllegalArgumentException(String.valueOf(state)); 121 } 122 } 123 124 @Override 125 public List<String> listWorkIds(String queueId, State state) { 126 MemoryBlockingQueue queue = getQueue(queueId); 127 if (state == null) { 128 return queue.keys(); 129 } 130 switch (state) { 131 case SCHEDULED: 132 return queue.scheduledKeys(); 133 case RUNNING: 134 return queue.runningKeys(); 135 default: 136 throw new IllegalArgumentException(String.valueOf(state)); 137 } 138 } 139 140 @Override 141 public long count(String queueId, State state) { 142 switch (state) { 143 case SCHEDULED: 144 return metrics(queueId).scheduled.longValue(); 145 case RUNNING: 146 return metrics(queueId).running.longValue(); 147 default: 148 throw new IllegalArgumentException(String.valueOf(state)); 149 } 150 } 151 152 @Override 153 public synchronized void removeScheduled(String queueId, String workId) { 154 final MemoryBlockingQueue queue = getQueue(queueId); 155 Work work = queue.lookup(workId); 156 if (work == null) { 157 return; 158 } 159 work.setWorkInstanceState(State.UNKNOWN); 160 listener.queueChanged(work, queue.workCanceled(work)); 161 } 162 163 @Override 164 public void setActive(String queueId, boolean value) { 165 WorkQueueMetrics metrics = getQueue(queueId).setActive(value); 166 if (value) { 167 listener.queueActivated(metrics); 168 } else { 169 listener.queueDeactivated(metrics); 170 } 171 } 172 173 @Override 174 public void listen(Listener listener) { 175 this.listener = listener; 176 } 177 178 @Override 179 public WorkQueueMetrics metrics(String queueId) { 180 return getQueue(queueId).metrics(); 181 } 182 183 /** 184 * Returns {@code true} if the given state is not {@code null} and matches the state of the given work or if the 185 * state is {@code null} and the work's state is either {@link State#SCHEDULED} or {@link State#RUNNING}, 186 * {@code false} otherwise. 187 * 188 * @since 9.3 189 */ 190 protected static boolean workHasState(Work work, State state) { 191 State workState = work.getWorkInstanceState(); 192 return state == null ? (workState == State.SCHEDULED || workState == State.RUNNING) : workState == state; 193 } 194 195}