001/*
002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.runtime.migration;
020
021import static java.nio.charset.StandardCharsets.UTF_8;
022import static java.util.stream.Collectors.toList;
023
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.OutputStream;
027import java.time.Instant;
028import java.util.Collection;
029import java.util.List;
030import java.util.Objects;
031import java.util.Random;
032import java.util.concurrent.CopyOnWriteArrayList;
033import java.util.concurrent.SynchronousQueue;
034import java.util.concurrent.ThreadPoolExecutor;
035import java.util.concurrent.TimeUnit;
036import java.util.function.BiConsumer;
037import java.util.function.Consumer;
038
039import org.apache.commons.io.IOUtils;
040import org.apache.logging.log4j.LogManager;
041import org.apache.logging.log4j.Logger;
042import org.nuxeo.runtime.api.Framework;
043import org.nuxeo.runtime.cluster.ClusterService;
044import org.nuxeo.runtime.kv.KeyValueService;
045import org.nuxeo.runtime.kv.KeyValueServiceImpl;
046import org.nuxeo.runtime.kv.KeyValueStore;
047import org.nuxeo.runtime.migration.MigrationDescriptor.MigrationStepDescriptor;
048import org.nuxeo.runtime.model.ComponentContext;
049import org.nuxeo.runtime.model.ComponentManager;
050import org.nuxeo.runtime.model.DefaultComponent;
051import org.nuxeo.runtime.model.Descriptor;
052import org.nuxeo.runtime.pubsub.AbstractPubSubBroker;
053import org.nuxeo.runtime.pubsub.SerializableMessage;
054
055/**
056 * Implementation for the Migration Service.
057 * <p>
058 * Data about migration status is stored in the "migration" Key/Value Store in the following format:
059 *
060 * <pre>
061 * mymigration:lock         write lock, containing debug info about locker; set with a TTL
062 * mymigration              the state of the migration, if not running
063 * mymigration:step         the step of the migration, if running
064 * mymigration:starttime    the migration step start time (milliseconds since epoch)
065 * mymigration:pingtime     the migration step last ping time (milliseconds since epoch)
066 * mymigration:message      the migration step current message
067 * mymigration:num          the migration step current num
068 * mymigration:total        the migration step current total
069 * </pre>
070 *
071 * @since 9.3
072 */
073public class MigrationServiceImpl extends DefaultComponent implements MigrationService {
074
075    private static final Logger log = LogManager.getLogger(MigrationServiceImpl.class);
076
077    public static final String KEYVALUE_STORE_NAME = "migration";
078
079    public static final String XP_CONFIG = "configuration";
080
081    public static final String LOCK = ":lock";
082
083    public static final String STEP = ":step";
084
085    public static final String START_TIME = ":starttime";
086
087    public static final String PING_TIME = ":pingtime";
088
089    public static final String PROGRESS_MESSAGE = ":message";
090
091    public static final String PROGRESS_NUM = ":num";
092
093    public static final String PROGRESS_TOTAL = ":total";
094
095    public static final long WRITE_LOCK_TTL = 10; // 10 sec for a few k/v writes is plenty enough
096
097    public static final String MIGRATION_INVAL_PUBSUB_TOPIC = "migrationinval";
098
099    protected static final Random RANDOM = new Random(); // NOSONAR (doesn't need cryptographic strength)
100
101    protected MigrationThreadPoolExecutor executor;
102
103    protected String nodeId;
104
105    protected MigrationInvalidator invalidator;
106
107    public static class MigrationInvalidation implements SerializableMessage {
108
109        private static final long serialVersionUID = 1L;
110
111        public final String id;
112
113        public MigrationInvalidation(String id) {
114            this.id = id;
115        }
116
117        @Override
118        public void serialize(OutputStream out) throws IOException {
119            IOUtils.write(id, out, UTF_8);
120        }
121
122        public static MigrationInvalidation deserialize(InputStream in) throws IOException {
123            String id = IOUtils.toString(in, UTF_8);
124            return new MigrationInvalidation(id);
125        }
126
127        @Override
128        public String toString() {
129            return getClass().getSimpleName() + "(" + id + ")";
130        }
131    }
132
133    public class MigrationInvalidator extends AbstractPubSubBroker<MigrationInvalidation> {
134
135        @Override
136        public MigrationInvalidation deserialize(InputStream in) throws IOException {
137            return MigrationInvalidation.deserialize(in);
138        }
139
140        @Override
141        public void receivedMessage(MigrationInvalidation message) {
142            String id = message.id;
143            Migrator migrator = getMigrator(id);
144            if (migrator == null) {
145                log.error("Unknown migration id received in invalidation: {}", id);
146                return;
147            }
148            // don't send again invalidation, call sub migrator directly
149            ((InvalidatorMigrator) migrator).migrator.notifyStatusChange();
150        }
151    }
152
153    /**
154     * A {@link Migrator migrator} wrapper to send invalidations to other nodes when calling
155     * {@link #notifyStatusChange()}.
156     *
157     * @since 10.10
158     */
159    public static class InvalidatorMigrator implements Migrator {
160
161        protected final String id;
162
163        protected final Migrator migrator;
164
165        protected final MigrationInvalidator invalidator;
166
167        public InvalidatorMigrator(String id, Migrator migrator, MigrationInvalidator invalidator) {
168            this.id = id;
169            this.migrator = migrator;
170            this.invalidator = invalidator;
171        }
172
173        @Override
174        public String probeState() {
175            return migrator.probeState();
176        }
177
178        @Override
179        public void run(String step, MigrationContext migrationContext) {
180            migrator.run(step, migrationContext);
181        }
182
183        @Override
184        public void notifyStatusChange() {
185            migrator.notifyStatusChange();
186            invalidator.sendMessage(new MigrationInvalidation(id));
187        }
188    }
189
190    protected static KeyValueStore getKeyValueStore() {
191        KeyValueService service = Framework.getService(KeyValueService.class);
192        Objects.requireNonNull(service, "Missing KeyValueService");
193        return service.getKeyValueStore(KEYVALUE_STORE_NAME);
194    }
195
196    public Collection<MigrationDescriptor> getMigrationDescriptors() {
197        return getDescriptors(XP_CONFIG);
198    }
199
200    @Override
201    public int getApplicationStartedOrder() {
202        return KeyValueServiceImpl.APPLICATION_STARTED_ORDER + 10;
203    }
204
205    /**
206     * Progress reporter that reports progress in the key/value store.
207     *
208     * @since 9.3
209     */
210    protected static class ProgressReporter {
211
212        protected final String id;
213
214        public ProgressReporter(String id) {
215            this.id = id;
216        }
217
218        /**
219         * Reports progress. If num or total are -2 then null is used.
220         */
221        public void reportProgress(String message, long num, long total, boolean ping) {
222            KeyValueStore keyValueStore = getKeyValueStore();
223            keyValueStore.put(id + PROGRESS_MESSAGE, message);
224            keyValueStore.put(id + PROGRESS_NUM, num == -2 ? null : String.valueOf(num));
225            keyValueStore.put(id + PROGRESS_TOTAL, total == -2 ? null : String.valueOf(total));
226            keyValueStore.put(id + PING_TIME, ping ? String.valueOf(System.currentTimeMillis()) : null);
227        }
228    }
229
230    /**
231     * Migration context implementation that reports progress in the key/value store and can be shutdown.
232     *
233     * @since 9.3
234     */
235    protected static class MigrationContextImpl implements MigrationContext {
236
237        protected final ProgressReporter progressReporter;
238
239        protected volatile boolean shutdown;
240
241        public MigrationContextImpl(ProgressReporter progressReporter) {
242            this.progressReporter = progressReporter;
243        }
244
245        @Override
246        public void reportProgress(String message, long num, long total) {
247            progressReporter.reportProgress(message, num, total, true);
248        }
249
250        @Override
251        public void requestShutdown() {
252            shutdown = true;
253        }
254
255        @Override
256        public boolean isShutdownRequested() {
257            return shutdown || Thread.currentThread().isInterrupted();
258        }
259    }
260
261    /**
262     * Runnable for the migrator, that knows about the migration context.
263     *
264     * @since 9.3
265     */
266    protected static class MigratorWithContext implements Runnable {
267
268        protected final Consumer<MigrationContext> migration;
269
270        protected final MigrationContext migrationContext;
271
272        protected final BiConsumer<MigrationContext, Throwable> afterMigration;
273
274        public MigratorWithContext(Consumer<MigrationContext> migration, ProgressReporter progressReporter,
275                BiConsumer<MigrationContext, Throwable> afterMigration) {
276            this.migration = migration;
277            this.migrationContext = new MigrationContextImpl(progressReporter);
278            this.afterMigration = afterMigration;
279        }
280
281        @Override
282        public void run() {
283            migration.accept(migrationContext);
284        }
285
286        public void afterMigration(Throwable t) {
287            afterMigration.accept(migrationContext, t);
288        }
289
290        public void requestShutdown() {
291            migrationContext.requestShutdown();
292        }
293    }
294
295    /**
296     * Thread pool executor that records {@link Runnable}s to be able to request shutdown on them.
297     *
298     * @since 9.3
299     */
300    protected static class MigrationThreadPoolExecutor extends ThreadPoolExecutor {
301
302        protected final List<MigratorWithContext> runnables = new CopyOnWriteArrayList<>();
303
304        public MigrationThreadPoolExecutor() {
305            // like Executors.newCachedThreadPool but with keepAliveTime of 0
306            super(0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
307        }
308
309        @Override
310        protected void beforeExecute(Thread thread, Runnable runnable) {
311            runnables.add((MigratorWithContext) runnable);
312        }
313
314        @Override
315        protected void afterExecute(Runnable runnable, Throwable t) {
316            runnables.remove(runnable);
317            ((MigratorWithContext) runnable).afterMigration(t);
318        }
319
320        public void requestShutdown() {
321            runnables.forEach(MigratorWithContext::requestShutdown);
322        }
323    }
324
325    @Override
326    public void start(ComponentContext context) {
327        super.start(context);
328        ClusterService clusterService = Framework.getService(ClusterService.class);
329        if (clusterService.isEnabled()) {
330            // register invalidator
331            nodeId = clusterService.getNodeId();
332            invalidator = new MigrationInvalidator();
333            invalidator.initialize(MIGRATION_INVAL_PUBSUB_TOPIC, nodeId);
334            log.info("Registered migration invalidator for node: {}", nodeId);
335        } else {
336            log.info("Not registering a migration invalidator because clustering is not enabled");
337        }
338
339        executor = new MigrationThreadPoolExecutor();
340        Framework.getRuntime().getComponentManager().addListener(new ComponentManager.Listener() {
341
342            @Override
343            public void beforeStop(ComponentManager mgr, boolean isStandby) {
344                // flag all migration threads as shutdown requested, without interrupting them
345                executor.requestShutdown();
346            }
347
348            @Override
349            public void afterStop(ComponentManager mgr, boolean isStandby) {
350                Framework.getRuntime().getComponentManager().removeListener(this);
351            }
352        });
353    }
354
355    @Override
356    public void stop(ComponentContext context) throws InterruptedException {
357        if (invalidator != null) {
358            invalidator.close();
359            invalidator = null;
360        }
361        // interrupt all migration tasks
362        executor.shutdownNow();
363        executor.awaitTermination(10, TimeUnit.SECONDS); // wait 10s for termination
364        executor = null;
365        super.stop(context);
366    }
367
368    @Override
369    public MigrationStatus getStatus(String id) {
370        MigrationDescriptor descr = getDescriptor(XP_CONFIG, id);
371        if (descr == null) {
372            return null; // migration unknown
373        }
374        KeyValueStore kv = getKeyValueStore();
375        String state = kv.getString(id);
376        if (state != null) {
377            return new MigrationStatus(state);
378        }
379        String step = kv.getString(id + STEP);
380        if (step == null) {
381            state = descr.defaultState;
382            return new MigrationStatus(state);
383        }
384        long startTime = Long.parseLong(kv.getString(id + START_TIME));
385        long pingTime = Long.parseLong(kv.getString(id + PING_TIME));
386        String progressMessage = kv.getString(id + PROGRESS_MESSAGE);
387        long progressNum = Long.parseLong(kv.getString(id + PROGRESS_NUM));
388        long progressTotal = Long.parseLong(kv.getString(id + PROGRESS_TOTAL));
389        if (progressMessage == null) {
390            progressMessage = "";
391        }
392        return new MigrationStatus(step, startTime, pingTime, progressMessage, progressNum, progressTotal);
393    }
394
395    @Override
396    public String probeAndSetState(String id) {
397        Migrator migrator = getMigrator(id);
398        String state = migrator.probeState();
399        if (state != null) {
400            ProgressReporter progressReporter = new ProgressReporter(id);
401            setState(id, state, migrator, progressReporter);
402        }
403        return state;
404    }
405
406    protected void setState(String id, String state, Migrator migrator, ProgressReporter progressReporter) {
407        atomic(id, kv -> {
408            String currentState = kv.getString(id);
409            String currentStep = kv.getString(id + STEP);
410            if (currentState == null && currentStep != null) {
411                throw new IllegalArgumentException("Migration: " + id + " already running step: " + currentStep);
412            }
413            setState(id, state, progressReporter, kv);
414        });
415        migrator.notifyStatusChange();
416    }
417
418    protected void setState(String id, String state, ProgressReporter progressReporter, KeyValueStore kv) {
419        kv.put(id, state);
420        kv.put(id + STEP, (String) null);
421        kv.put(id + START_TIME, (String) null);
422        progressReporter.reportProgress(null, -2, -2, false);
423    }
424
425    @Override
426    public void runStep(String id, String step) {
427        Migrator migrator = getMigrator(id);
428        MigrationDescriptor descr = getDescriptor(XP_CONFIG, id);
429        MigrationStepDescriptor stepDescr = descr.steps.get(step);
430        if (stepDescr == null) {
431            throw new IllegalArgumentException("Unknown step: " + step + " for migration: " + id);
432        }
433
434        ProgressReporter progressReporter = new ProgressReporter(id);
435
436        // switch to running
437        atomic(id, kv -> {
438            String state = kv.getString(id);
439            String currentStep = kv.getString(id + STEP);
440            if (state == null && currentStep == null) {
441                state = descr.defaultState;
442                if (!descr.states.containsKey(state)) {
443                    throw new IllegalArgumentException("Invalid default state: " + state + " for migration: " + id);
444                }
445            } else if (state == null) {
446                throw new IllegalArgumentException("Migration: " + id + " already running step: " + currentStep);
447            }
448            if (!descr.states.containsKey(state)) {
449                throw new IllegalArgumentException("Invalid current state: " + state + " for migration: " + id);
450            }
451            if (!stepDescr.fromState.equals(state)) {
452                throw new IllegalArgumentException(
453                        "Invalid step: " + step + " for migration: " + id + " in state: " + state);
454            }
455            String time = String.valueOf(System.currentTimeMillis());
456            kv.put(id + STEP, step);
457            kv.put(id + START_TIME, time);
458            progressReporter.reportProgress("", 0, -1, true);
459            kv.put(id, (String) null);
460        });
461
462        // allow notification of running step
463        migrator.notifyStatusChange();
464
465        Consumer<MigrationContext> migration = migrationContext -> {
466            Thread.currentThread().setName("Nuxeo-Migrator-" + id);
467            migrator.run(step, migrationContext);
468        };
469
470        BiConsumer<MigrationContext, Throwable> afterMigration = (migrationContext, t) -> {
471            if (t != null) {
472                log.error("Exception during execution of step: {} for migration: {}", step, id, t);
473            }
474            // after the migrator is finished, change state, except if shutdown is requested or exception
475            String state = t != null || migrationContext.isShutdownRequested() ? stepDescr.fromState
476                    : stepDescr.toState;
477            atomic(id, kv -> setState(id, state, progressReporter, kv));
478            // allow notification of new state
479            migrator.notifyStatusChange();
480        };
481
482        executor.execute(new MigratorWithContext(migration, progressReporter, afterMigration));
483    }
484
485    protected Migrator getMigrator(String id) {
486        MigrationDescriptor descr = getDescriptor(XP_CONFIG, id);
487        if (descr == null) {
488            throw new IllegalArgumentException("Unknown migration: " + id);
489        }
490        Class<?> klass = descr.klass;
491        if (!Migrator.class.isAssignableFrom(klass)) {
492            throw new RuntimeException(
493                    "Invalid class not implementing Migrator: " + klass.getName() + " for migration: " + id);
494        }
495        try {
496            Migrator migrator = (Migrator) klass.getConstructor().newInstance();
497            if (invalidator != null) {
498                migrator = new InvalidatorMigrator(id, migrator, invalidator);
499            }
500            return migrator;
501        } catch (ReflectiveOperationException e) {
502            throw new RuntimeException(e);
503        }
504    }
505
506    /**
507     * Executes something while setting a lock, retrying a few times if the lock is already set.
508     */
509    protected void atomic(String id, Consumer<KeyValueStore> consumer) {
510        KeyValueStore kv = getKeyValueStore();
511        for (int i = 0; i < 5; i++) {
512            // the value of the lock is useful for debugging
513            String value = Instant.now() + " node=" + nodeId;
514            if (kv.compareAndSet(id + LOCK, null, value, WRITE_LOCK_TTL)) {
515                try {
516                    consumer.accept(kv);
517                    return;
518                } finally {
519                    kv.put(id + LOCK, (String) null);
520                }
521            }
522            try {
523                Thread.sleep(RANDOM.nextInt(100) * i);
524            } catch (InterruptedException e) {
525                Thread.currentThread().interrupt();
526                throw new RuntimeException(e);
527            }
528        }
529        String currentLock = kv.getString(id + LOCK);
530        throw new RuntimeException("Cannot lock for write migration: " + id + ", already locked: " + currentLock);
531    }
532
533    @Override
534    public Migration getMigration(String id) {
535        MigrationDescriptor descriptor = getDescriptor(XP_CONFIG, id);
536        MigrationStatus status = getStatus(id);
537        if (descriptor == null || status == null) {
538            return null;
539        }
540        return Migration.from(descriptor, status);
541    }
542
543    @Override
544    public List<Migration> getMigrations() {
545        return getDescriptors(XP_CONFIG).stream().map(Descriptor::getId).map(this::getMigration).collect(toList());
546    }
547
548    /**
549     * @implNote Runs a migration if it has exactly one available step to run.
550     */
551    @Override
552    public void probeAndRun(String id) {
553        probeAndSetState(id);
554        MigrationStatus status = getStatus(id);
555        var steps = Migration.from(getDescriptor(XP_CONFIG, id), status).getSteps();
556        if (steps.size() != 1) {
557            throw new IllegalArgumentException(String.format(
558                    "Migration: %s must have only one runnable step from state: %s", id, status.getState())); // NOSONAR
559        }
560        runStep(id, steps.get(0).getId());
561    }
562
563}