001/*
002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.runtime.migration;
020
021import static java.nio.charset.StandardCharsets.UTF_8;
022
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.OutputStream;
026import java.time.Instant;
027import java.util.Collection;
028import java.util.List;
029import java.util.Objects;
030import java.util.Random;
031import java.util.concurrent.CopyOnWriteArrayList;
032import java.util.concurrent.SynchronousQueue;
033import java.util.concurrent.ThreadPoolExecutor;
034import java.util.concurrent.TimeUnit;
035import java.util.function.BiConsumer;
036import java.util.function.Consumer;
037
038import org.apache.commons.io.IOUtils;
039import org.apache.commons.lang3.StringUtils;
040import org.apache.logging.log4j.LogManager;
041import org.apache.logging.log4j.Logger;
042import org.nuxeo.runtime.api.Framework;
043import org.nuxeo.runtime.kv.KeyValueService;
044import org.nuxeo.runtime.kv.KeyValueServiceImpl;
045import org.nuxeo.runtime.kv.KeyValueStore;
046import org.nuxeo.runtime.migration.MigrationDescriptor.MigrationStepDescriptor;
047import org.nuxeo.runtime.model.ComponentContext;
048import org.nuxeo.runtime.model.ComponentManager;
049import org.nuxeo.runtime.model.DefaultComponent;
050import org.nuxeo.runtime.pubsub.AbstractPubSubBroker;
051import org.nuxeo.runtime.pubsub.SerializableMessage;
052
053/**
054 * Implementation for the Migration Service.
055 * <p>
056 * Data about migration status is stored in the "migration" Key/Value Store in the following format:
057 *
058 * <pre>
059 * mymigration:lock         write lock, containing debug info about locker; set with a TTL
060 * mymigration              the state of the migration, if not running
061 * mymigration:step         the step of the migration, if running
062 * mymigration:starttime    the migration step start time (milliseconds since epoch)
063 * mymigration:pingtime     the migration step last ping time (milliseconds since epoch)
064 * mymigration:message      the migration step current message
065 * mymigration:num          the migration step current num
066 * mymigration:total        the migration step current total
067 * </pre>
068 *
069 * @since 9.3
070 */
071public class MigrationServiceImpl extends DefaultComponent implements MigrationService {
072
073    private static final Logger log = LogManager.getLogger(MigrationServiceImpl.class);
074
075    public static final String KEYVALUE_STORE_NAME = "migration";
076
077    public static final String XP_CONFIG = "configuration";
078
079    public static final String LOCK = ":lock";
080
081    public static final String STEP = ":step";
082
083    public static final String START_TIME = ":starttime";
084
085    public static final String PING_TIME = ":pingtime";
086
087    public static final String PROGRESS_MESSAGE = ":message";
088
089    public static final String PROGRESS_NUM = ":num";
090
091    public static final String PROGRESS_TOTAL = ":total";
092
093    public static final long WRITE_LOCK_TTL = 10; // 10 sec for a few k/v writes is plenty enough
094
095    public static final String MIGRATION_INVAL_PUBSUB_TOPIC = "migrationinval";
096
097    public static final String CLUSTERING_ENABLED_PROP = "repository.clustering.enabled";
098
099    public static final String NODE_ID_PROP = "repository.clustering.id";
100
101    protected static final Random RANDOM = new Random(); // NOSONAR (doesn't need cryptographic strength)
102
103    protected MigrationThreadPoolExecutor executor;
104
105    protected MigrationInvalidator invalidator;
106
107    public static class MigrationInvalidation implements SerializableMessage {
108
109        private static final long serialVersionUID = 1L;
110
111        public final String id;
112
113        public MigrationInvalidation(String id) {
114            this.id = id;
115        }
116
117        @Override
118        public void serialize(OutputStream out) throws IOException {
119            IOUtils.write(id, out, UTF_8);
120        }
121
122        public static MigrationInvalidation deserialize(InputStream in) throws IOException {
123            String id = IOUtils.toString(in, UTF_8);
124            return new MigrationInvalidation(id);
125        }
126
127        @Override
128        public String toString() {
129            return getClass().getSimpleName() + "(" + id + ")";
130        }
131    }
132
133    public class MigrationInvalidator extends AbstractPubSubBroker<MigrationInvalidation> {
134
135        @Override
136        public MigrationInvalidation deserialize(InputStream in) throws IOException {
137            return MigrationInvalidation.deserialize(in);
138        }
139
140        @Override
141        public void receivedMessage(MigrationInvalidation message) {
142            String id = message.id;
143            Migrator migrator = getMigrator(id);
144            if (migrator == null) {
145                log.error("Unknown migration id received in invalidation: {}", id);
146                return;
147            }
148            // don't send again invalidation, call sub migrator directly
149            ((InvalidatorMigrator) migrator).migrator.notifyStatusChange();
150        }
151    }
152
153    /**
154     * A {@link Migrator migrator} wrapper to send invalidations to other nodes when calling
155     * {@link #notifyStatusChange()}.
156     *
157     * @since 10.10
158     */
159    public static class InvalidatorMigrator implements Migrator {
160
161        protected final String id;
162
163        protected final Migrator migrator;
164
165        protected final MigrationInvalidator invalidator;
166
167        public InvalidatorMigrator(String id, Migrator migrator, MigrationInvalidator invalidator) {
168            this.id = id;
169            this.migrator = migrator;
170            this.invalidator = invalidator;
171        }
172
173        @Override
174        public String probeState() {
175            return migrator.probeState();
176        }
177
178        @Override
179        public void run(String step, MigrationContext migrationContext) {
180            migrator.run(step, migrationContext);
181        }
182
183        @Override
184        public void notifyStatusChange() {
185            migrator.notifyStatusChange();
186            invalidator.sendMessage(new MigrationInvalidation(id));
187        }
188    }
189
190    protected static KeyValueStore getKeyValueStore() {
191        KeyValueService service = Framework.getService(KeyValueService.class);
192        Objects.requireNonNull(service, "Missing KeyValueService");
193        return service.getKeyValueStore(KEYVALUE_STORE_NAME);
194    }
195
196    public Collection<MigrationDescriptor> getMigrationDescriptors() {
197        return getDescriptors(XP_CONFIG);
198    }
199
200    @Override
201    public int getApplicationStartedOrder() {
202        return KeyValueServiceImpl.APPLICATION_STARTED_ORDER + 10;
203    }
204
205    /**
206     * Progress reporter that reports progress in the key/value store.
207     *
208     * @since 9.3
209     */
210    protected static class ProgressReporter {
211
212        protected final String id;
213
214        public ProgressReporter(String id) {
215            this.id = id;
216        }
217
218        /**
219         * Reports progress. If num or total are -2 then null is used.
220         */
221        public void reportProgress(String message, long num, long total, boolean ping) {
222            KeyValueStore keyValueStore = getKeyValueStore();
223            keyValueStore.put(id + PROGRESS_MESSAGE, message);
224            keyValueStore.put(id + PROGRESS_NUM, num == -2 ? null : String.valueOf(num));
225            keyValueStore.put(id + PROGRESS_TOTAL, total == -2 ? null : String.valueOf(total));
226            keyValueStore.put(id + PING_TIME, ping ? String.valueOf(System.currentTimeMillis()) : null);
227        }
228    }
229
230    /**
231     * Migration context implementation that reports progress in the key/value store and can be shutdown.
232     *
233     * @since 9.3
234     */
235    protected static class MigrationContextImpl implements MigrationContext {
236
237        protected final ProgressReporter progressReporter;
238
239        protected volatile boolean shutdown;
240
241        public MigrationContextImpl(ProgressReporter progressReporter) {
242            this.progressReporter = progressReporter;
243        }
244
245        @Override
246        public void reportProgress(String message, long num, long total) {
247            progressReporter.reportProgress(message, num, total, true);
248        }
249
250        @Override
251        public void requestShutdown() {
252            shutdown = true;
253        }
254
255        @Override
256        public boolean isShutdownRequested() {
257            return shutdown || Thread.currentThread().isInterrupted();
258        }
259    }
260
261    /**
262     * Runnable for the migrator, that knows about the migration context.
263     *
264     * @since 9.3
265     */
266    protected static class MigratorWithContext implements Runnable {
267
268        protected final Consumer<MigrationContext> migration;
269
270        protected final MigrationContext migrationContext;
271
272        protected final BiConsumer<MigrationContext, Throwable> afterMigration;
273
274        public MigratorWithContext(Consumer<MigrationContext> migration, ProgressReporter progressReporter,
275                BiConsumer<MigrationContext, Throwable> afterMigration) {
276            this.migration = migration;
277            this.migrationContext = new MigrationContextImpl(progressReporter);
278            this.afterMigration = afterMigration;
279        }
280
281        @Override
282        public void run() {
283            migration.accept(migrationContext);
284        }
285
286        public void afterMigration(Throwable t) {
287            afterMigration.accept(migrationContext, t);
288        }
289
290        public void requestShutdown() {
291            migrationContext.requestShutdown();
292        }
293    }
294
295    /**
296     * Thread pool executor that records {@link Runnable}s to be able to request shutdown on them.
297     *
298     * @since 9.3
299     */
300    protected static class MigrationThreadPoolExecutor extends ThreadPoolExecutor {
301
302        protected final List<MigratorWithContext> runnables = new CopyOnWriteArrayList<>();
303
304        public MigrationThreadPoolExecutor() {
305            // like Executors.newCachedThreadPool but with keepAliveTime of 0
306            super(0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
307        }
308
309        @Override
310        protected void beforeExecute(Thread thread, Runnable runnable) {
311            runnables.add((MigratorWithContext) runnable);
312        }
313
314        @Override
315        protected void afterExecute(Runnable runnable, Throwable t) {
316            runnables.remove(runnable);
317            ((MigratorWithContext) runnable).afterMigration(t);
318        }
319
320        public void requestShutdown() {
321            runnables.forEach(MigratorWithContext::requestShutdown);
322        }
323    }
324
325    @Override
326    public void start(ComponentContext context) {
327        super.start(context);
328        if (Framework.isBooleanPropertyTrue(CLUSTERING_ENABLED_PROP)) {
329            // register migration invalidator
330            String nodeId = Framework.getProperty(NODE_ID_PROP);
331            if (StringUtils.isBlank(nodeId)) {
332                nodeId = String.valueOf(RANDOM.nextLong());
333                log.warn("Missing cluster node id configuration, please define it explicitly "
334                        + "(usually through repository.clustering.id). Using random cluster node id instead: "
335                        + nodeId);
336            } else {
337                nodeId = nodeId.trim();
338            }
339            invalidator = new MigrationInvalidator();
340            invalidator.initialize(MIGRATION_INVAL_PUBSUB_TOPIC, nodeId);
341            log.info("Registered migration invalidator for node: {}", nodeId);
342        } else {
343            log.info("Not registering a migration invalidator because clustering is not enabled");
344        }
345
346        executor = new MigrationThreadPoolExecutor();
347        Framework.getRuntime().getComponentManager().addListener(new ComponentManager.Listener() {
348
349            @Override
350            public void beforeStop(ComponentManager mgr, boolean isStandby) {
351                // flag all migration threads as shutdown requested, without interrupting them
352                executor.requestShutdown();
353            }
354
355            @Override
356            public void afterStop(ComponentManager mgr, boolean isStandby) {
357                Framework.getRuntime().getComponentManager().removeListener(this);
358            }
359        });
360    }
361
362    @Override
363    public void stop(ComponentContext context) throws InterruptedException {
364        if (invalidator != null) {
365            invalidator.close();
366            invalidator = null;
367        }
368        // interrupt all migration tasks
369        executor.shutdownNow();
370        executor.awaitTermination(10, TimeUnit.SECONDS); // wait 10s for termination
371        executor = null;
372        super.stop(context);
373    }
374
375    @Override
376    public MigrationStatus getStatus(String id) {
377        MigrationDescriptor descr = getDescriptor(XP_CONFIG, id);
378        if (descr == null) {
379            return null; // migration unknown
380        }
381        KeyValueStore kv = getKeyValueStore();
382        String state = kv.getString(id);
383        if (state != null) {
384            return new MigrationStatus(state);
385        }
386        String step = kv.getString(id + STEP);
387        if (step == null) {
388            state = descr.defaultState;
389            return new MigrationStatus(state);
390        }
391        long startTime = Long.parseLong(kv.getString(id + START_TIME));
392        long pingTime = Long.parseLong(kv.getString(id + PING_TIME));
393        String progressMessage = kv.getString(id + PROGRESS_MESSAGE);
394        long progressNum = Long.parseLong(kv.getString(id + PROGRESS_NUM));
395        long progressTotal = Long.parseLong(kv.getString(id + PROGRESS_TOTAL));
396        if (progressMessage == null) {
397            progressMessage = "";
398        }
399        return new MigrationStatus(step, startTime, pingTime, progressMessage, progressNum, progressTotal);
400    }
401
402    @Override
403    public String probeAndSetState(String id) {
404        Migrator migrator = getMigrator(id);
405        String state = migrator.probeState();
406        if (state != null) {
407            ProgressReporter progressReporter = new ProgressReporter(id);
408            setState(id, state, migrator, progressReporter);
409        }
410        return state;
411    }
412
413    protected void setState(String id, String state, Migrator migrator, ProgressReporter progressReporter) {
414        atomic(id, kv -> {
415            String currentState = kv.getString(id);
416            String currentStep = kv.getString(id + STEP);
417            if (currentState == null && currentStep != null) {
418                throw new IllegalArgumentException("Migration: " + id + " already running step: " + currentStep);
419            }
420            setState(id, state, progressReporter, kv);
421        });
422        migrator.notifyStatusChange();
423    }
424
425    protected void setState(String id, String state, ProgressReporter progressReporter, KeyValueStore kv) {
426        kv.put(id, state);
427        kv.put(id + STEP, (String) null);
428        kv.put(id + START_TIME, (String) null);
429        progressReporter.reportProgress(null, -2, -2, false);
430    }
431
432    @Override
433    public void runStep(String id, String step) {
434        Migrator migrator = getMigrator(id);
435        MigrationDescriptor descr = getDescriptor(XP_CONFIG, id);
436        MigrationStepDescriptor stepDescr = descr.steps.get(step);
437        if (stepDescr == null) {
438            throw new IllegalArgumentException("Unknown step: " + step + " for migration: " + id);
439        }
440
441        ProgressReporter progressReporter = new ProgressReporter(id);
442
443        // switch to running
444        atomic(id, kv -> {
445            String state = kv.getString(id);
446            String currentStep = kv.getString(id + STEP);
447            if (state == null && currentStep == null) {
448                state = descr.defaultState;
449                if (!descr.states.containsKey(state)) {
450                    throw new IllegalArgumentException("Invalid default state: " + state + " for migration: " + id);
451                }
452            } else if (state == null) {
453                throw new IllegalArgumentException("Migration: " + id + " already running step: " + currentStep);
454            }
455            if (!descr.states.containsKey(state)) {
456                throw new IllegalArgumentException("Invalid current state: " + state + " for migration: " + id);
457            }
458            if (!stepDescr.fromState.equals(state)) {
459                throw new IllegalArgumentException(
460                        "Invalid step: " + step + " for migration: " + id + " in state: " + state);
461            }
462            String time = String.valueOf(System.currentTimeMillis());
463            kv.put(id + STEP, step);
464            kv.put(id + START_TIME, time);
465            progressReporter.reportProgress("", 0, -1, true);
466            kv.put(id, (String) null);
467        });
468
469        // allow notification of running step
470        migrator.notifyStatusChange();
471
472        Consumer<MigrationContext> migration = migrationContext -> {
473            Thread.currentThread().setName("Nuxeo-Migrator-" + id);
474            migrator.run(step, migrationContext);
475        };
476
477        BiConsumer<MigrationContext, Throwable> afterMigration = (migrationContext, t) -> {
478            if (t != null) {
479                log.error("Exception during execution of step: {} for migration: {}", step, id, t);
480            }
481            // after the migrator is finished, change state, except if shutdown is requested or exception
482            String state = t != null || migrationContext.isShutdownRequested()
483                    ? stepDescr.fromState
484                    : stepDescr.toState;
485            atomic(id, kv -> setState(id, state, progressReporter, kv));
486            // allow notification of new state
487            migrator.notifyStatusChange();
488        };
489
490        executor.execute(new MigratorWithContext(migration, progressReporter, afterMigration));
491    }
492
493    protected Migrator getMigrator(String id) {
494        MigrationDescriptor descr = getDescriptor(XP_CONFIG, id);
495        if (descr == null) {
496            throw new IllegalArgumentException("Unknown migration: " + id);
497        }
498        Class<?> klass = descr.klass;
499        if (!Migrator.class.isAssignableFrom(klass)) {
500            throw new RuntimeException(
501                    "Invalid class not implementing Migrator: " + klass.getName() + " for migration: " + id);
502        }
503        try {
504            Migrator migrator = (Migrator) klass.getConstructor().newInstance();
505            if (invalidator != null) {
506                migrator = new InvalidatorMigrator(id, migrator, invalidator);
507            }
508            return migrator;
509        } catch (ReflectiveOperationException e) {
510            throw new RuntimeException(e);
511        }
512    }
513
514    /**
515     * Executes something while setting a lock, retrying a few times if the lock is already set.
516     */
517    protected void atomic(String id, Consumer<KeyValueStore> consumer) {
518        KeyValueStore kv = getKeyValueStore();
519        String nodeid = Framework.getProperty(NODE_ID_PROP);
520        for (int i = 0; i < 5; i++) {
521            // the value of the lock is useful for debugging
522            String value = Instant.now() + " node=" + nodeid;
523            if (kv.compareAndSet(id + LOCK, null, value, WRITE_LOCK_TTL)) {
524                try {
525                    consumer.accept(kv);
526                    return;
527                } finally {
528                    kv.put(id + LOCK, (String) null);
529                }
530            }
531            try {
532                Thread.sleep((long) (RANDOM.nextInt(100) * i));
533            } catch (InterruptedException e) {
534                Thread.currentThread().interrupt();
535                throw new RuntimeException(e);
536            }
537        }
538        String currentLock = kv.getString(id + LOCK);
539        throw new RuntimeException("Cannot lock for write migration: " + id + ", already locked: " + currentLock);
540    }
541
542}