001/*
002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.runtime.migration;
020
021import static java.nio.charset.StandardCharsets.UTF_8;
022
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.OutputStream;
026import java.time.Instant;
027import java.util.Collection;
028import java.util.List;
029import java.util.Objects;
030import java.util.Random;
031import java.util.concurrent.CopyOnWriteArrayList;
032import java.util.concurrent.SynchronousQueue;
033import java.util.concurrent.ThreadPoolExecutor;
034import java.util.concurrent.TimeUnit;
035import java.util.function.BiConsumer;
036import java.util.function.Consumer;
037
038import org.apache.commons.io.IOUtils;
039import org.apache.commons.lang3.StringUtils;
040import org.apache.logging.log4j.LogManager;
041import org.apache.logging.log4j.Logger;
042import org.nuxeo.runtime.api.Framework;
043import org.nuxeo.runtime.kv.KeyValueService;
044import org.nuxeo.runtime.kv.KeyValueServiceImpl;
045import org.nuxeo.runtime.kv.KeyValueStore;
046import org.nuxeo.runtime.migration.MigrationDescriptor.MigrationStepDescriptor;
047import org.nuxeo.runtime.model.ComponentContext;
048import org.nuxeo.runtime.model.ComponentManager;
049import org.nuxeo.runtime.model.DefaultComponent;
050import org.nuxeo.runtime.pubsub.AbstractPubSubBroker;
051import org.nuxeo.runtime.pubsub.SerializableMessage;
052
053/**
054 * Implementation for the Migration Service.
055 * <p>
056 * Data about migration status is stored in the "migration" Key/Value Store in the following format:
057 *
058 * <pre>
059 * mymigration:lock         write lock, containing debug info about locker; set with a TTL
060 * mymigration              the state of the migration, if not running
061 * mymigration:step         the step of the migration, if running
062 * mymigration:starttime    the migration step start time (milliseconds since epoch)
063 * mymigration:pingtime     the migration step last ping time (milliseconds since epoch)
064 * mymigration:message      the migration step current message
065 * mymigration:num          the migration step current num
066 * mymigration:total        the migration step current total
067 * </pre>
068 *
069 * @since 9.3
070 */
071public class MigrationServiceImpl extends DefaultComponent implements MigrationService {
072
073    private static final Logger log = LogManager.getLogger(MigrationServiceImpl.class);
074
075    public static final String KEYVALUE_STORE_NAME = "migration";
076
077    public static final String XP_CONFIG = "configuration";
078
079    public static final String LOCK = ":lock";
080
081    public static final String STEP = ":step";
082
083    public static final String START_TIME = ":starttime";
084
085    public static final String PING_TIME = ":pingtime";
086
087    public static final String PROGRESS_MESSAGE = ":message";
088
089    public static final String PROGRESS_NUM = ":num";
090
091    public static final String PROGRESS_TOTAL = ":total";
092
093    public static final long WRITE_LOCK_TTL = 10; // 10 sec for a few k/v writes is plenty enough
094
095    public static final String MIGRATION_INVAL_PUBSUB_TOPIC = "migrationinval";
096
097    public static final String CLUSTERING_ENABLED_PROP = "repository.clustering.enabled";
098
099    public static final String NODE_ID_PROP = "repository.clustering.id";
100
101    protected static final Random RANDOM = new Random(); // NOSONAR (doesn't need cryptographic strength)
102
103    protected MigrationThreadPoolExecutor executor;
104
105    protected MigrationInvalidator invalidator;
106
107    public static class MigrationInvalidation implements SerializableMessage {
108
109        private static final long serialVersionUID = 1L;
110
111        public final String id;
112
113        public MigrationInvalidation(String id) {
114            this.id = id;
115        }
116
117        @Override
118        public void serialize(OutputStream out) throws IOException {
119            IOUtils.write(id, out, UTF_8);
120        }
121
122        public static MigrationInvalidation deserialize(InputStream in) throws IOException {
123            String id = IOUtils.toString(in, UTF_8);
124            return new MigrationInvalidation(id);
125        }
126
127        @Override
128        public String toString() {
129            return getClass().getSimpleName() + "(" + id + ")";
130        }
131    }
132
133    public class MigrationInvalidator extends AbstractPubSubBroker<MigrationInvalidation> {
134
135        @Override
136        public MigrationInvalidation deserialize(InputStream in) throws IOException {
137            return MigrationInvalidation.deserialize(in);
138        }
139
140        @Override
141        public void receivedMessage(MigrationInvalidation message) {
142            String id = message.id;
143            Migrator migrator = getMigrator(id);
144            if (migrator == null) {
145                log.error("Unknown migration id received in invalidation: {}", id);
146                return;
147            }
148            migrator.notifyStatusChange();
149        }
150    }
151
152    protected static KeyValueStore getKeyValueStore() {
153        KeyValueService service = Framework.getService(KeyValueService.class);
154        Objects.requireNonNull(service, "Missing KeyValueService");
155        return service.getKeyValueStore(KEYVALUE_STORE_NAME);
156    }
157
158    public Collection<MigrationDescriptor> getMigrationDescriptors() {
159        return getDescriptors(XP_CONFIG);
160    }
161
162    @Override
163    public int getApplicationStartedOrder() {
164        return KeyValueServiceImpl.APPLICATION_STARTED_ORDER + 10;
165    }
166
167    /**
168     * Progress reporter that reports progress in the key/value store.
169     *
170     * @since 9.3
171     */
172    protected static class ProgressReporter {
173
174        protected final String id;
175
176        public ProgressReporter(String id) {
177            this.id = id;
178        }
179
180        /**
181         * Reports progress. If num or total are -2 then null is used.
182         */
183        public void reportProgress(String message, long num, long total, boolean ping) {
184            KeyValueStore keyValueStore = getKeyValueStore();
185            keyValueStore.put(id + PROGRESS_MESSAGE, message);
186            keyValueStore.put(id + PROGRESS_NUM, num == -2 ? null : String.valueOf(num));
187            keyValueStore.put(id + PROGRESS_TOTAL, total == -2 ? null : String.valueOf(total));
188            keyValueStore.put(id + PING_TIME, ping ? String.valueOf(System.currentTimeMillis()) : null);
189        }
190    }
191
192    /**
193     * Migration context implementation that reports progress in the key/value store and can be shutdown.
194     *
195     * @since 9.3
196     */
197    protected static class MigrationContextImpl implements MigrationContext {
198
199        protected final ProgressReporter progressReporter;
200
201        protected volatile boolean shutdown;
202
203        public MigrationContextImpl(ProgressReporter progressReporter) {
204            this.progressReporter = progressReporter;
205        }
206
207        @Override
208        public void reportProgress(String message, long num, long total) {
209            progressReporter.reportProgress(message, num, total, true);
210        }
211
212        @Override
213        public void requestShutdown() {
214            shutdown = true;
215        }
216
217        @Override
218        public boolean isShutdownRequested() {
219            return shutdown || Thread.currentThread().isInterrupted();
220        }
221    }
222
223    /**
224     * Runnable for the migrator, that knows about the migration context.
225     *
226     * @since 9.3
227     */
228    protected static class MigratorWithContext implements Runnable {
229
230        protected final Consumer<MigrationContext> migration;
231
232        protected final MigrationContext migrationContext;
233
234        protected final BiConsumer<MigrationContext, Throwable> afterMigration;
235
236        public MigratorWithContext(Consumer<MigrationContext> migration, ProgressReporter progressReporter,
237                BiConsumer<MigrationContext, Throwable> afterMigration) {
238            this.migration = migration;
239            this.migrationContext = new MigrationContextImpl(progressReporter);
240            this.afterMigration = afterMigration;
241        }
242
243        @Override
244        public void run() {
245            migration.accept(migrationContext);
246        }
247
248        public void afterMigration(Throwable t) {
249            afterMigration.accept(migrationContext, t);
250        }
251
252        public void requestShutdown() {
253            migrationContext.requestShutdown();
254        }
255    }
256
257    /**
258     * Thread pool executor that records {@link Runnable}s to be able to request shutdown on them.
259     *
260     * @since 9.3
261     */
262    protected static class MigrationThreadPoolExecutor extends ThreadPoolExecutor {
263
264        protected final List<MigratorWithContext> runnables = new CopyOnWriteArrayList<>();
265
266        public MigrationThreadPoolExecutor() {
267            // like Executors.newCachedThreadPool but with keepAliveTime of 0
268            super(0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
269        }
270
271        @Override
272        protected void beforeExecute(Thread thread, Runnable runnable) {
273            runnables.add((MigratorWithContext) runnable);
274        }
275
276        @Override
277        protected void afterExecute(Runnable runnable, Throwable t) {
278            runnables.remove(runnable);
279            ((MigratorWithContext) runnable).afterMigration(t);
280        }
281
282        public void requestShutdown() {
283            runnables.forEach(MigratorWithContext::requestShutdown);
284        }
285    }
286
287    @Override
288    public void start(ComponentContext context) {
289        super.start(context);
290        if (Framework.isBooleanPropertyTrue(CLUSTERING_ENABLED_PROP)) {
291            // register migration invalidator
292            String nodeId = Framework.getProperty(NODE_ID_PROP);
293            if (StringUtils.isBlank(nodeId)) {
294                nodeId = String.valueOf(RANDOM.nextLong());
295                log.warn("Missing cluster node id configuration, please define it explicitly "
296                        + "(usually through repository.clustering.id). Using random cluster node id instead: "
297                        + nodeId);
298            } else {
299                nodeId = nodeId.trim();
300            }
301            invalidator = new MigrationInvalidator();
302            invalidator.initialize(MIGRATION_INVAL_PUBSUB_TOPIC, nodeId);
303            log.info("Registered migration invalidator for node: {}", nodeId);
304        } else {
305            log.info("Not registering a migration invalidator because clustering is not enabled");
306        }
307
308        executor = new MigrationThreadPoolExecutor();
309        Framework.getRuntime().getComponentManager().addListener(new ComponentManager.Listener() {
310
311            @Override
312            public void beforeStop(ComponentManager mgr, boolean isStandby) {
313                // flag all migration threads as shutdown requested, without interrupting them
314                executor.requestShutdown();
315            }
316
317            @Override
318            public void afterStop(ComponentManager mgr, boolean isStandby) {
319                Framework.getRuntime().getComponentManager().removeListener(this);
320            }
321        });
322    }
323
324    @Override
325    public void stop(ComponentContext context) throws InterruptedException {
326        // interrupt all migration tasks
327        executor.shutdownNow();
328        executor.awaitTermination(10, TimeUnit.SECONDS); // wait 10s for termination
329        executor = null;
330        super.stop(context);
331    }
332
333    @Override
334    public MigrationStatus getStatus(String id) {
335        MigrationDescriptor descr = getDescriptor(XP_CONFIG, id);
336        if (descr == null) {
337            return null; // migration unknown
338        }
339        KeyValueStore kv = getKeyValueStore();
340        String state = kv.getString(id);
341        if (state != null) {
342            return new MigrationStatus(state);
343        }
344        String step = kv.getString(id + STEP);
345        if (step == null) {
346            state = descr.defaultState;
347            return new MigrationStatus(state);
348        }
349        long startTime = Long.parseLong(kv.getString(id + START_TIME));
350        long pingTime = Long.parseLong(kv.getString(id + PING_TIME));
351        String progressMessage = kv.getString(id + PROGRESS_MESSAGE);
352        long progressNum = Long.parseLong(kv.getString(id + PROGRESS_NUM));
353        long progressTotal = Long.parseLong(kv.getString(id + PROGRESS_TOTAL));
354        if (progressMessage == null) {
355            progressMessage = "";
356        }
357        return new MigrationStatus(step, startTime, pingTime, progressMessage, progressNum, progressTotal);
358    }
359
360    @Override
361    public String probeAndSetState(String id) {
362        Migrator migrator = getMigrator(id);
363        String state = migrator.probeState();
364        if (state != null) {
365            ProgressReporter progressReporter = new ProgressReporter(id);
366            setState(id, state, migrator, progressReporter);
367        }
368        return state;
369    }
370
371    protected void setState(String id, String state, Migrator migrator, ProgressReporter progressReporter) {
372        atomic(id, kv -> {
373            String currentState = kv.getString(id);
374            String currentStep = kv.getString(id + STEP);
375            if (currentState == null && currentStep != null) {
376                throw new IllegalArgumentException("Migration: " + id + " already running step: " + currentStep);
377            }
378            setState(id, state, progressReporter, kv);
379        });
380        migrator.notifyStatusChange();
381    }
382
383    protected void setState(String id, String state, ProgressReporter progressReporter, KeyValueStore kv) {
384        kv.put(id, state);
385        kv.put(id + STEP, (String) null);
386        kv.put(id + START_TIME, (String) null);
387        progressReporter.reportProgress(null, -2, -2, false);
388    }
389
390    @Override
391    public void runStep(String id, String step) {
392        Migrator migrator = getMigrator(id);
393        MigrationDescriptor descr = getDescriptor(XP_CONFIG, id);
394        MigrationStepDescriptor stepDescr = descr.steps.get(step);
395        if (stepDescr == null) {
396            throw new IllegalArgumentException("Unknown step: " + step + " for migration: " + id);
397        }
398
399        ProgressReporter progressReporter = new ProgressReporter(id);
400
401        // switch to running
402        atomic(id, kv -> {
403            String state = kv.getString(id);
404            String currentStep = kv.getString(id + STEP);
405            if (state == null && currentStep == null) {
406                state = descr.defaultState;
407                if (!descr.states.containsKey(state)) {
408                    throw new IllegalArgumentException("Invalid default state: " + state + " for migration: " + id);
409                }
410            } else if (state == null) {
411                throw new IllegalArgumentException("Migration: " + id + " already running step: " + currentStep);
412            }
413            if (!descr.states.containsKey(state)) {
414                throw new IllegalArgumentException("Invalid current state: " + state + " for migration: " + id);
415            }
416            if (!stepDescr.fromState.equals(state)) {
417                throw new IllegalArgumentException(
418                        "Invalid step: " + step + " for migration: " + id + " in state: " + state);
419            }
420            String time = String.valueOf(System.currentTimeMillis());
421            kv.put(id + STEP, step);
422            kv.put(id + START_TIME, time);
423            progressReporter.reportProgress("", 0, -1, true);
424            kv.put(id, (String) null);
425        });
426
427        // allow notification of running step
428        migrator.notifyStatusChange();
429
430        Consumer<MigrationContext> migration = migrationContext -> {
431            Thread.currentThread().setName("Nuxeo-Migrator-" + id);
432            migrator.run(step, migrationContext);
433        };
434
435        BiConsumer<MigrationContext, Throwable> afterMigration = (migrationContext, t) -> {
436            if (t != null) {
437                log.error("Exception during execution of step: {} for migration: {}", step, id, t);
438            }
439            // after the migrator is finished, change state, except if shutdown is requested or exception
440            String state = t != null || migrationContext.isShutdownRequested()
441                    ? stepDescr.fromState
442                    : stepDescr.toState;
443            atomic(id, kv -> setState(id, state, progressReporter, kv));
444            // allow notification of new state
445            migrator.notifyStatusChange();
446        };
447
448        executor.execute(new MigratorWithContext(migration, progressReporter, afterMigration));
449    }
450
451    protected Migrator getMigrator(String id) {
452        MigrationDescriptor descr = getDescriptor(XP_CONFIG, id);
453        if (descr == null) {
454            throw new IllegalArgumentException("Unknown migration: " + id);
455        }
456        Class<?> klass = descr.klass;
457        if (!Migrator.class.isAssignableFrom(klass)) {
458            throw new RuntimeException(
459                    "Invalid class not implementing Migrator: " + klass.getName() + " for migration: " + id);
460        }
461        try {
462            return (Migrator) klass.getConstructor().newInstance();
463        } catch (ReflectiveOperationException e) {
464            throw new RuntimeException(e);
465        }
466    }
467
468    /**
469     * Executes something while setting a lock, retrying a few times if the lock is already set.
470     */
471    protected void atomic(String id, Consumer<KeyValueStore> consumer) {
472        KeyValueStore kv = getKeyValueStore();
473        String nodeid = Framework.getProperty(NODE_ID_PROP);
474        for (int i = 0; i < 5; i++) {
475            // the value of the lock is useful for debugging
476            String value = Instant.now() + " node=" + nodeid;
477            if (kv.compareAndSet(id + LOCK, null, value, WRITE_LOCK_TTL)) {
478                try {
479                    consumer.accept(kv);
480                    return;
481                } finally {
482                    kv.put(id + LOCK, (String) null);
483                }
484            }
485            try {
486                Thread.sleep((long) (RANDOM.nextInt(100) * i));
487            } catch (InterruptedException e) {
488                Thread.currentThread().interrupt();
489                throw new RuntimeException(e);
490            }
491        }
492        String currentLock = kv.getString(id + LOCK);
493        throw new RuntimeException("Cannot lock for write migration: " + id + ", already locked: " + currentLock);
494    }
495
496}