001/* 002 * (C) Copyright 2012-2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.work; 021 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Map; 026import java.util.Optional; 027 028import org.nuxeo.ecm.core.work.api.Work; 029import org.nuxeo.ecm.core.work.api.Work.State; 030import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 031import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 032 033/** 034 * Implementation of a {@link WorkQueuing} using in-memory queuing. 035 * 036 * @since 5.8 037 */ 038public class MemoryWorkQueuing implements WorkQueuing { 039 040 protected final Map<String, MemoryBlockingQueue> allQueued = new HashMap<>(); 041 042 protected Listener listener; 043 044 public MemoryWorkQueuing(Listener listener) { 045 this.listener = listener; 046 } 047 048 @Override 049 public MemoryBlockingQueue init(WorkQueueDescriptor config) { 050 int capacity = config.getCapacity(); 051 if (capacity <= 0) { 052 capacity = -1; // unbounded 053 } 054 MemoryBlockingQueue queue = new MemoryBlockingQueue(config.id, this, capacity); 055 allQueued.put(queue.queueId, queue); 056 return queue; 057 } 058 059 @Override 060 public MemoryBlockingQueue getQueue(String queueId) { 061 return allQueued.get(queueId); 062 } 063 064 @Override 065 public void workSchedule(String queueId, Work work) { 066 listener.queueChanged(work, getQueue(queueId).workSchedule(work)); 067 } 068 069 @Override 070 public void workCanceled(String queueId, Work work) { 071 listener.queueChanged(work, getQueue(queueId).workCanceled(work)); 072 } 073 074 @Override 075 public void workRunning(String queueId, Work work) { 076 listener.queueChanged(work, getQueue(queueId).workRunning(work)); 077 } 078 079 @Override 080 public void workCompleted(String queueId, Work work) { 081 listener.queueChanged(work, getQueue(queueId).workCompleted(work)); 082 } 083 084 @Override 085 public void workReschedule(String queueId, Work work) { 086 listener.queueChanged(work, getQueue(queueId).workRescheduleRunning(work)); 087 } 088 089 Optional<Work> lookup(String workId) { 090 return allQueued.values() 091 .stream() 092 .map(queue -> queue.lookup(workId)) 093 .filter(work -> work != null) 094 .findAny(); 095 } 096 097 @Override 098 public Work find(String workId, State state) { 099 return lookup(workId) 100 .filter(work -> work.getWorkInstanceState() == state) 101 .orElse(null); 102 } 103 104 @Override 105 public boolean isWorkInState(String workId, State state) { 106 Optional<Work> option = lookup(workId); 107 if (!option.isPresent()) { 108 return false; 109 } 110 if (state == null) { 111 return true; 112 } 113 return option.get() 114 .getWorkInstanceState() 115 .equals(state); 116 } 117 118 @Override 119 public State getWorkState(String workId) { 120 return lookup(workId).map(Work::getWorkInstanceState) 121 .orElse(null); 122 } 123 124 @Override 125 public List<Work> listWork(String queueId, State state) { 126 MemoryBlockingQueue queue = getQueue(queueId); 127 if (state == null) { 128 return queue.list(); 129 } 130 switch (state) { 131 case SCHEDULED: 132 return queue.listScheduled(); 133 case RUNNING: 134 return queue.listRunning(); 135 default: 136 throw new IllegalArgumentException(String.valueOf(state)); 137 } 138 } 139 140 @Override 141 public List<String> listWorkIds(String queueId, State state) { 142 MemoryBlockingQueue queue = getQueue(queueId); 143 if (state == null) { 144 return queue.keys(); 145 } 146 switch (state) { 147 case SCHEDULED: 148 return queue.scheduledKeys(); 149 case RUNNING: 150 return queue.runningKeys(); 151 default: 152 throw new IllegalArgumentException(String.valueOf(state)); 153 } 154 } 155 156 @Override 157 public long count(String queueId, State state) { 158 switch (state) { 159 case SCHEDULED: 160 return metrics(queueId).scheduled.longValue(); 161 case RUNNING: 162 return metrics(queueId).running.longValue(); 163 default: 164 throw new IllegalArgumentException(String.valueOf(state)); 165 } 166 } 167 168 @Override 169 public synchronized void removeScheduled(String queueId, String workId) { 170 final MemoryBlockingQueue queue = getQueue(queueId); 171 Work work = queue.lookup(workId); 172 if (work == null) { 173 return; 174 } 175 work.setWorkInstanceState(State.UNKNOWN); 176 listener.queueChanged(work, queue.workCanceled(work)); 177 } 178 179 @Override 180 public int setSuspending(String queueId) { 181 MemoryBlockingQueue queue = getQueue(queueId); 182 List<Runnable> scheduled = new ArrayList<>(); 183 queue.drainTo(scheduled); 184 for (Runnable r : scheduled) { 185 Work work = WorkHolder.getWork(r); 186 listener.queueChanged(work, queue.workCanceled(work)); 187 } 188 return scheduled.size(); 189 } 190 191 @Override 192 public void setActive(String queueId, boolean value) { 193 WorkQueueMetrics metrics = getQueue(queueId).setActive(value); 194 if (value) { 195 listener.queueActivated(metrics); 196 } else { 197 listener.queueDeactivated(metrics); 198 } 199 } 200 201 @Override 202 public void listen(Listener listener) { 203 this.listener = listener; 204 } 205 206 @Override 207 public WorkQueueMetrics metrics(String queueId) { 208 return getQueue(queueId).metrics(); 209 } 210 211}