001/* 002 * (C) Copyright 2012-2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.work; 021 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.Optional; 026 027import org.nuxeo.ecm.core.work.api.Work; 028import org.nuxeo.ecm.core.work.api.Work.State; 029import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 030import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 031 032/** 033 * Implementation of a {@link WorkQueuing} using in-memory queuing. 034 * 035 * @since 5.8 036 */ 037public class MemoryWorkQueuing implements WorkQueuing { 038 039 protected final Map<String, MemoryBlockingQueue> allQueued = new HashMap<>(); 040 041 protected Listener listener; 042 043 public MemoryWorkQueuing(Listener listener) { 044 this.listener = listener; 045 } 046 047 @Override 048 public MemoryBlockingQueue init(WorkQueueDescriptor config) { 049 int capacity = config.getCapacity(); 050 if (capacity <= 0) { 051 capacity = -1; // unbounded 052 } 053 MemoryBlockingQueue queue = new MemoryBlockingQueue(config.id, this, capacity); 054 allQueued.put(queue.queueId, queue); 055 return queue; 056 } 057 058 @Override 059 public MemoryBlockingQueue getQueue(String queueId) { 060 return allQueued.get(queueId); 061 } 062 063 @Override 064 public void workSchedule(String queueId, Work work) { 065 listener.queueChanged(work, getQueue(queueId).workSchedule(work)); 066 } 067 068 @Override 069 public void workCanceled(String queueId, Work work) { 070 listener.queueChanged(work, getQueue(queueId).workCanceled(work)); 071 } 072 073 @Override 074 public void workRunning(String queueId, Work work) { 075 listener.queueChanged(work, getQueue(queueId).workRunning(work)); 076 } 077 078 @Override 079 public void workCompleted(String queueId, Work work) { 080 listener.queueChanged(work, getQueue(queueId).workCompleted(work)); 081 } 082 083 @Override 084 public void workReschedule(String queueId, Work work) { 085 listener.queueChanged(work, getQueue(queueId).workRescheduleRunning(work)); 086 } 087 088 Optional<Work> lookup(String workId) { 089 return allQueued.values() 090 .stream() 091 .map(queue -> queue.lookup(workId)) 092 .filter(work -> work != null) 093 .findAny(); 094 } 095 096 @Override 097 public Work find(String workId, State state) { 098 return lookup(workId) 099 .filter(work -> work.getWorkInstanceState() == state) 100 .orElse(null); 101 } 102 103 @Override 104 public boolean isWorkInState(String workId, State state) { 105 Optional<Work> option = lookup(workId); 106 if (!option.isPresent()) { 107 return false; 108 } 109 if (state == null) { 110 return true; 111 } 112 return option.get() 113 .getWorkInstanceState() 114 .equals(state); 115 } 116 117 @Override 118 public State getWorkState(String workId) { 119 return lookup(workId).map(Work::getWorkInstanceState) 120 .orElse(null); 121 } 122 123 @Override 124 public List<Work> listWork(String queueId, State state) { 125 MemoryBlockingQueue queue = getQueue(queueId); 126 if (state == null) { 127 return queue.list(); 128 } 129 switch (state) { 130 case SCHEDULED: 131 return queue.listScheduled(); 132 case RUNNING: 133 return queue.listRunning(); 134 default: 135 throw new IllegalArgumentException(String.valueOf(state)); 136 } 137 } 138 139 @Override 140 public List<String> listWorkIds(String queueId, State state) { 141 MemoryBlockingQueue queue = getQueue(queueId); 142 if (state == null) { 143 return queue.keys(); 144 } 145 switch (state) { 146 case SCHEDULED: 147 return queue.scheduledKeys(); 148 case RUNNING: 149 return queue.runningKeys(); 150 default: 151 throw new IllegalArgumentException(String.valueOf(state)); 152 } 153 } 154 155 @Override 156 public long count(String queueId, State state) { 157 switch (state) { 158 case SCHEDULED: 159 return metrics(queueId).scheduled.longValue(); 160 case RUNNING: 161 return metrics(queueId).running.longValue(); 162 default: 163 throw new IllegalArgumentException(String.valueOf(state)); 164 } 165 } 166 167 @Override 168 public synchronized void removeScheduled(String queueId, String workId) { 169 final MemoryBlockingQueue queue = getQueue(queueId); 170 Work work = queue.lookup(workId); 171 if (work == null) { 172 return; 173 } 174 work.setWorkInstanceState(State.UNKNOWN); 175 listener.queueChanged(work, queue.workCanceled(work)); 176 } 177 178 @Override 179 public void setActive(String queueId, boolean value) { 180 WorkQueueMetrics metrics = getQueue(queueId).setActive(value); 181 if (value) { 182 listener.queueActivated(metrics); 183 } else { 184 listener.queueDeactivated(metrics); 185 } 186 } 187 188 @Override 189 public void listen(Listener listener) { 190 this.listener = listener; 191 } 192 193 @Override 194 public WorkQueueMetrics metrics(String queueId) { 195 return getQueue(queueId).metrics(); 196 } 197 198}