001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     pierre
018 */
019package org.nuxeo.ecm.core.work;
020
021import org.apache.logging.log4j.LogManager;
022import org.apache.logging.log4j.Logger;
023import org.nuxeo.ecm.core.work.api.Work;
024import org.nuxeo.runtime.api.Framework;
025import org.nuxeo.runtime.kv.KeyValueService;
026import org.nuxeo.runtime.kv.KeyValueStore;
027
028/**
029 * Work state helper to handle, out-of-API, distributed, work states.<br>
030 *
031 * @since 10.2
032 */
033public class WorkStateHelper {
034    private static final Logger log = LogManager.getLogger(WorkStateHelper.class);
035
036    protected static final String KV_NAME = "workManager";
037
038    protected static final String STATE_SUFFIX = ":state";
039
040    protected static final String OFFSET_SUFFIX = ":offset";
041
042    protected static final String GROUP_JOIN_COUNT_SUFFIX = ":group";
043
044    protected static final long GROUP_JOIN_COUNT_TTL_SECONDS = 600;
045
046    protected static final String CANCELED = "canceled";
047
048    protected static KeyValueStore getKeyValueStore() {
049        return Framework.getService(KeyValueService.class).getKeyValueStore(KV_NAME);
050    }
051
052    /**
053     * Returns the last offset created for a given work id.
054     * <p>
055     *
056     * @param workId id of the work whose we want the last offset
057     * @return the last offset or -1 for convenience
058     * @since 10.3
059     */
060    protected static long getLastOffset(String workId) {
061        String stringOffset = getKeyValueStore().getString(getOffsetKey(workId));
062        return stringOffset == null ? -1 : Long.parseLong(stringOffset);
063    }
064
065    protected static String getOffsetKey(String workId) {
066        return workId + OFFSET_SUFFIX;
067    }
068
069    protected static Work.State getState(String workId) {
070        String stringState = getKeyValueStore().getString(getStateKey(workId));
071        if (stringState == null || CANCELED.equals(stringState)) {
072            log.debug("getState work: {}, state: {}", workId, stringState);
073        }
074        return stringState == null || CANCELED.equals(stringState) ? null : Work.State.valueOf(stringState);
075    }
076
077    protected static String getStateKey(String workId) {
078        return workId + STATE_SUFFIX;
079    }
080
081    protected static String getGroupKey(String group) {
082        return group + GROUP_JOIN_COUNT_SUFFIX;
083    }
084
085    protected static boolean isCanceled(String workId) {
086        return CANCELED.equals(getKeyValueStore().getString(getStateKey(workId)));
087    }
088
089    protected static void setCanceled(String workId) {
090        log.debug("Canceling work: {}", workId);
091        getKeyValueStore().put(getStateKey(workId), CANCELED, 0);
092    }
093
094    protected static void setLastOffset(String workId, Long offset, long ttl) {
095        getKeyValueStore().put(getOffsetKey(workId), offset == null ? null : offset, ttl);
096    }
097
098    protected static void setState(String workId, Work.State state, long ttl) {
099        log.debug("setState work: {}, state: {}, ttl: {}", workId, state, ttl);
100        getKeyValueStore().put(getStateKey(workId), state == null ? null : state.toString(), ttl);
101    }
102
103    // @since 11.1
104    protected static void addGroupJoinWork(String group) {
105        getKeyValueStore().addAndGet(getGroupKey(group), 1);
106    }
107
108    // @since 11.1
109    protected static boolean removeGroupJoinWork(String group) {
110        long count = getKeyValueStore().addAndGet(getGroupKey(group), -1);
111        if (count <= 0) {
112            getKeyValueStore().setTTL(group, GROUP_JOIN_COUNT_TTL_SECONDS);
113            return true;
114        }
115        return false;
116    }
117
118    private WorkStateHelper() {
119        // hide constructor
120    }
121
122}