001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.runtime.migration;
020
021import static java.nio.charset.StandardCharsets.UTF_8;
022
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.OutputStream;
026import java.time.Instant;
027import java.util.List;
028import java.util.Map;
029import java.util.Objects;
030import java.util.Random;
031import java.util.concurrent.CopyOnWriteArrayList;
032import java.util.concurrent.SynchronousQueue;
033import java.util.concurrent.ThreadPoolExecutor;
034import java.util.concurrent.TimeUnit;
035import java.util.function.BiConsumer;
036import java.util.function.Consumer;
037
038import org.apache.commons.io.IOUtils;
039import org.apache.commons.lang3.StringUtils;
040import org.apache.commons.logging.Log;
041import org.apache.commons.logging.LogFactory;
042import org.nuxeo.runtime.api.Framework;
043import org.nuxeo.runtime.kv.KeyValueService;
044import org.nuxeo.runtime.kv.KeyValueServiceImpl;
045import org.nuxeo.runtime.kv.KeyValueStore;
046import org.nuxeo.runtime.migration.MigrationDescriptor.MigrationStepDescriptor;
047import org.nuxeo.runtime.model.ComponentContext;
048import org.nuxeo.runtime.model.ComponentInstance;
049import org.nuxeo.runtime.model.ComponentManager;
050import org.nuxeo.runtime.model.DefaultComponent;
051import org.nuxeo.runtime.model.SimpleContributionRegistry;
052import org.nuxeo.runtime.pubsub.AbstractPubSubBroker;
053import org.nuxeo.runtime.pubsub.SerializableMessage;
054
055/**
056 * Implementation for the Migration Service.
057 * <p>
058 * Data about migration status is stored in the "migration" Key/Value Store in the following format:
059 *
060 * <pre>
061 * mymigration:lock         write lock, containing debug info about locker; set with a TTL
062 * mymigration              the state of the migration, if not running
063 * mymigration:step         the step of the migration, if running
064 * mymigration:starttime    the migration step start time (milliseconds since epoch)
065 * mymigration:pingtime     the migration step last ping time (milliseconds since epoch)
066 * mymigration:message      the migration step current message
067 * mymigration:num          the migration step current num
068 * mymigration:total        the migration step current total
069 * </pre>
070 *
071 * @since 9.3
072 */
073public class MigrationServiceImpl extends DefaultComponent implements MigrationService {
074
075    // package-private to avoid synthetic accessor for access from nested class
076    static final Log log = LogFactory.getLog(MigrationServiceImpl.class);
077
078    public static final String KEYVALUE_STORE_NAME = "migration";
079
080    public static final String CONFIG_XP = "configuration";
081
082    public static final String LOCK = ":lock";
083
084    public static final String STEP = ":step";
085
086    public static final String START_TIME = ":starttime";
087
088    public static final String PING_TIME = ":pingtime";
089
090    public static final String PROGRESS_MESSAGE = ":message";
091
092    public static final String PROGRESS_NUM = ":num";
093
094    public static final String PROGRESS_TOTAL = ":total";
095
096    public static final long WRITE_LOCK_TTL = 10; // 10 sec for a few k/v writes is plenty enough
097
098    public static final String MIGRATION_INVAL_PUBSUB_TOPIC = "migrationinval";
099
100    public static final String CLUSTERING_ENABLED_PROP = "repository.clustering.enabled";
101
102    public static final String NODE_ID_PROP = "repository.clustering.id";
103
104    protected static final Random RANDOM = new Random();
105
106    protected final MigrationRegistry registry = new MigrationRegistry();
107
108    protected MigrationThreadPoolExecutor executor;
109
110    protected MigrationInvalidator invalidator;
111
112    public static class MigrationRegistry extends SimpleContributionRegistry<MigrationDescriptor> {
113
114        @Override
115        public String getContributionId(MigrationDescriptor contrib) {
116            return contrib.id;
117        }
118
119        public MigrationDescriptor getMigrationDescriptor(String id) {
120            return getCurrentContribution(id);
121        }
122
123        public Map<String, MigrationDescriptor> getMigrationDescriptors() {
124            return currentContribs;
125        }
126
127        @Override
128        public boolean isSupportingMerge() {
129            return true;
130        }
131
132        @Override
133        public MigrationDescriptor clone(MigrationDescriptor orig) {
134            return new MigrationDescriptor(orig);
135        }
136
137        @Override
138        public void merge(MigrationDescriptor src, MigrationDescriptor dst) {
139            dst.merge(src);
140        }
141    }
142
143    public static class MigrationInvalidation implements SerializableMessage {
144
145        private static final long serialVersionUID = 1L;
146
147        public final String id;
148
149        public MigrationInvalidation(String id) {
150            this.id = id;
151        }
152
153        @Override
154        public void serialize(OutputStream out) throws IOException {
155            IOUtils.write(id, out, UTF_8);
156        }
157
158        public static MigrationInvalidation deserialize(InputStream in) throws IOException {
159            String id = IOUtils.toString(in, UTF_8);
160            return new MigrationInvalidation(id);
161        }
162
163        @Override
164        public String toString() {
165            return getClass().getSimpleName() + "(" + id + ")";
166        }
167    }
168
169    public class MigrationInvalidator extends AbstractPubSubBroker<MigrationInvalidation> {
170
171        @Override
172        public MigrationInvalidation deserialize(InputStream in) throws IOException {
173            return MigrationInvalidation.deserialize(in);
174        }
175
176        @Override
177        public void receivedMessage(MigrationInvalidation message) {
178            String id = message.id;
179            StatusChangeNotifier notifier = getStatusChangeNotifier(id);
180            if (notifier == null) {
181                log.error("Unknown migration id received in invalidation: " + id);
182                return;
183            }
184            notifier.notifyStatusChange();
185        }
186    }
187
188    protected static KeyValueStore getKeyValueStore() {
189        KeyValueService service = Framework.getService(KeyValueService.class);
190        Objects.requireNonNull(service, "Missing KeyValueService");
191        return service.getKeyValueStore(KEYVALUE_STORE_NAME);
192    }
193
194    @Override
195    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
196        switch (extensionPoint) {
197        case CONFIG_XP:
198            registerMigrationDescriptor((MigrationDescriptor) contribution);
199            break;
200        default:
201            throw new RuntimeException("Unknown extension point: " + extensionPoint);
202        }
203    }
204
205    @Override
206    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
207        switch (extensionPoint) {
208        case CONFIG_XP:
209            unregisterMigrationDescriptor((MigrationDescriptor) contribution);
210            break;
211        }
212    }
213
214    public void registerMigrationDescriptor(MigrationDescriptor descriptor) {
215        registry.addContribution(descriptor);
216        log.info("Registered migration: " + descriptor.id);
217    }
218
219    public void unregisterMigrationDescriptor(MigrationDescriptor descriptor) {
220        registry.removeContribution(descriptor);
221        log.info("Unregistered migration: " + descriptor.id);
222    }
223
224    public Map<String, MigrationDescriptor> getMigrationDescriptors() {
225        return registry.getMigrationDescriptors();
226    }
227
228    @Override
229    public int getApplicationStartedOrder() {
230        return KeyValueServiceImpl.APPLICATION_STARTED_ORDER + 10;
231    }
232
233    /**
234     * Progress reporter that reports progress in the key/value store.
235     *
236     * @since 9.3
237     */
238    protected static class ProgressReporter {
239
240        protected final String id;
241
242        public ProgressReporter(String id) {
243            this.id = id;
244        }
245
246        /**
247         * Reports progress. If num or total are -2 then null is used.
248         */
249        public void reportProgress(String message, long num, long total, boolean ping) {
250            KeyValueStore keyValueStore = getKeyValueStore();
251            keyValueStore.put(id + PROGRESS_MESSAGE, message);
252            keyValueStore.put(id + PROGRESS_NUM, num == -2 ? null : String.valueOf(num));
253            keyValueStore.put(id + PROGRESS_TOTAL, total == -2 ? null : String.valueOf(total));
254            keyValueStore.put(id + PING_TIME, ping ? String.valueOf(System.currentTimeMillis()) : null);
255        }
256    }
257
258    /**
259     * Migration context implementation that reports progress in the key/value store and can be shutdown.
260     *
261     * @since 9.3
262     */
263    protected static class MigrationContextImpl implements MigrationContext {
264
265        protected final ProgressReporter progressReporter;
266
267        protected volatile boolean shutdown;
268
269        public MigrationContextImpl(ProgressReporter progressReporter) {
270            this.progressReporter = progressReporter;
271        }
272
273        @Override
274        public void reportProgress(String message, long num, long total) {
275            progressReporter.reportProgress(message, num, total, true);
276        }
277
278        @Override
279        public void requestShutdown() {
280            shutdown = true;
281        }
282
283        @Override
284        public boolean isShutdownRequested() {
285            return shutdown || Thread.currentThread().isInterrupted();
286        }
287    }
288
289    /**
290     * Runnable for the migrator, that knows about the migration context.
291     *
292     * @since 9.3
293     */
294    protected static class MigratorWithContext implements Runnable {
295
296        protected final Consumer<MigrationContext> migration;
297
298        protected final MigrationContext migrationContext;
299
300        protected final BiConsumer<MigrationContext, Throwable> afterMigration;
301
302        public MigratorWithContext(Consumer<MigrationContext> migration, ProgressReporter progressReporter,
303                BiConsumer<MigrationContext, Throwable> afterMigration) {
304            this.migration = migration;
305            this.migrationContext = new MigrationContextImpl(progressReporter);
306            this.afterMigration = afterMigration;
307        }
308
309        @Override
310        public void run() {
311            migration.accept(migrationContext);
312        }
313
314        public void afterMigration(Throwable t) {
315            afterMigration.accept(migrationContext, t);
316        }
317
318        public void requestShutdown() {
319            migrationContext.requestShutdown();
320        }
321    }
322
323    /**
324     * Thread pool executor that records {@link Runnable}s to be able to request shutdown on them.
325     *
326     * @since 9.3
327     */
328    protected static class MigrationThreadPoolExecutor extends ThreadPoolExecutor {
329
330        protected final List<MigratorWithContext> runnables = new CopyOnWriteArrayList<>();
331
332        public MigrationThreadPoolExecutor() {
333            // like Executors.newCachedThreadPool but with keepAliveTime of 0
334            super(0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
335        }
336
337        @Override
338        protected void beforeExecute(Thread thread, Runnable runnable) {
339            runnables.add((MigratorWithContext) runnable);
340        }
341
342        @Override
343        protected void afterExecute(Runnable runnable, Throwable t) {
344            runnables.remove(runnable);
345            ((MigratorWithContext) runnable).afterMigration(t);
346        }
347
348        public void requestShutdown() {
349            runnables.forEach(migratorWithContext -> migratorWithContext.requestShutdown());
350        }
351    }
352
353    @Override
354    public void start(ComponentContext context) {
355        if (Framework.isBooleanPropertyTrue(CLUSTERING_ENABLED_PROP)) {
356            // register migration invalidator
357            String nodeId = Framework.getProperty(NODE_ID_PROP);
358            if (StringUtils.isBlank(nodeId)) {
359                nodeId = String.valueOf(RANDOM.nextLong());
360                log.warn("Missing cluster node id configuration, please define it explicitly "
361                        + "(usually through repository.clustering.id). Using random cluster node id instead: "
362                        + nodeId);
363            } else {
364                nodeId = nodeId.trim();
365            }
366            invalidator = new MigrationInvalidator();
367            invalidator.initialize(MIGRATION_INVAL_PUBSUB_TOPIC, nodeId);
368            log.info("Registered migration invalidator for node: " + nodeId);
369        } else {
370            log.info("Not registering a migration invalidator because clustering is not enabled");
371        }
372
373        executor = new MigrationThreadPoolExecutor();
374        Framework.getRuntime().getComponentManager().addListener(new ComponentManager.Listener() {
375
376            @Override
377            public void beforeStop(ComponentManager mgr, boolean isStandby) {
378                // flag all migration threads as shutdown requested, without interrupting them
379                executor.requestShutdown();
380            }
381
382            @Override
383            public void afterStop(ComponentManager mgr, boolean isStandby) {
384                Framework.getRuntime().getComponentManager().removeListener(this);
385            }
386        });
387    }
388
389    @Override
390    public void stop(ComponentContext context) throws InterruptedException {
391        // interrupt all migration tasks
392        executor.shutdownNow();
393        executor.awaitTermination(10, TimeUnit.SECONDS); // wait 10s for termination
394        executor = null;
395    }
396
397    @Override
398    public MigrationStatus getStatus(String id) {
399        MigrationDescriptor descr = registry.getMigrationDescriptor(id);
400        if (descr == null) {
401            return null; // migration unknown
402        }
403        KeyValueStore kv = getKeyValueStore();
404        String state = kv.getString(id);
405        if (state != null) {
406            return new MigrationStatus(state);
407        }
408        String step = kv.getString(id + STEP);
409        if (step == null) {
410            state = descr.getDefaultState();
411            return new MigrationStatus(state);
412        }
413        long startTime = Long.parseLong(kv.getString(id + START_TIME));
414        long pingTime = Long.parseLong(kv.getString(id + PING_TIME));
415        String progressMessage = kv.getString(id + PROGRESS_MESSAGE);
416        long progressNum = Long.parseLong(kv.getString(id + PROGRESS_NUM));
417        long progressTotal = Long.parseLong(kv.getString(id + PROGRESS_TOTAL));
418        if (progressMessage == null) {
419            progressMessage = "";
420        }
421        return new MigrationStatus(step, startTime, pingTime, progressMessage, progressNum, progressTotal);
422    }
423
424    @Override
425    public void runStep(String id, String step) {
426        MigrationDescriptor descr = registry.getMigrationDescriptor(id);
427        if (descr == null) {
428            throw new IllegalArgumentException("Unknown migration: " + id);
429        }
430        MigrationStepDescriptor stepDescr = descr.getSteps().get(step);
431        if (stepDescr == null) {
432            throw new IllegalArgumentException("Unknown step: " + step + " for migration: " + id);
433        }
434        Class<?> klass = stepDescr.getKlass();
435        if (!Migrator.class.isAssignableFrom(klass)) {
436            throw new RuntimeException("Invalid class not implementing Migrator: " + klass.getName() + " for step: "
437                    + step + " for migration: " + id);
438        }
439        Migrator migrator;
440        try {
441            migrator = (Migrator) klass.getConstructor().newInstance();
442        } catch (ReflectiveOperationException e) {
443            throw new RuntimeException(e);
444        }
445        StatusChangeNotifier notifier = getStatusChangeNotifier(descr);
446
447        ProgressReporter progressReporter = new ProgressReporter(id);
448
449        // switch to running
450        atomic(id, kv -> {
451            String state = kv.getString(id);
452            String currentStep = kv.getString(id + STEP);
453            if (state == null && currentStep == null) {
454                state = descr.getDefaultState();
455                if (!descr.getStates().containsKey(state)) {
456                    throw new RuntimeException("Invalid default state: " + state + " for migration: " + id);
457                }
458            } else if (state == null) {
459                throw new IllegalArgumentException("Migration: " + id + " already running step: " + currentStep);
460            }
461            if (!descr.getStates().containsKey(state)) {
462                throw new RuntimeException("Invalid current state: " + state + " for migration: " + id);
463            }
464            if (!stepDescr.getFromState().equals(state)) {
465                throw new IllegalArgumentException(
466                        "Invalid step: " + step + " for migration: " + id + " in state: " + state);
467            }
468            String time = String.valueOf(System.currentTimeMillis());
469            kv.put(id + STEP, step);
470            kv.put(id + START_TIME, time);
471            progressReporter.reportProgress("", 0, -1, true);
472            kv.put(id, (String) null);
473        });
474
475        // allow notification of running step
476        notifier.notifyStatusChange();
477
478        Consumer<MigrationContext> migration = migrationContext -> {
479            Thread.currentThread().setName("Nuxeo-Migrator-" + id);
480            migrator.run(migrationContext);
481        };
482
483        BiConsumer<MigrationContext, Throwable> afterMigration = (migrationContext, t) -> {
484            if (t != null) {
485                log.error("Exception during execution of step: " + step + " for migration: " + id, t);
486            }
487            // after the migrator is finished, change state, except if shutdown is requested or exception
488            String state = (t != null || migrationContext.isShutdownRequested()) ? stepDescr.getFromState()
489                    : stepDescr.getToState();
490            atomic(id, kv -> {
491                kv.put(id, state);
492                kv.put(id + STEP, (String) null);
493                kv.put(id + START_TIME, (String) null);
494                progressReporter.reportProgress(null, -2, -2, false);
495            });
496            // allow notification of new state
497            notifier.notifyStatusChange();
498        };
499
500        executor.execute(new MigratorWithContext(migration, progressReporter, afterMigration));
501    }
502
503    protected StatusChangeNotifier getStatusChangeNotifier(String id) {
504        MigrationDescriptor descr = registry.getMigrationDescriptor(id);
505        return descr == null ? null : getStatusChangeNotifier(descr);
506    }
507
508    protected StatusChangeNotifier getStatusChangeNotifier(MigrationDescriptor descr) {
509        Class<?> klass = descr.getStatusChangeNotifierClass();
510        if (klass == null) {
511            throw new RuntimeException("Missing statusChangeNotifier for migration: " + descr.getId());
512        }
513        if (!StatusChangeNotifier.class.isAssignableFrom(klass)) {
514            throw new RuntimeException("Invalid class not implementing StatusChangeNotifier: " + klass.getName()
515                    + " for migration: " + descr.getId());
516        }
517        StatusChangeNotifier notifier;
518        try {
519            notifier = (StatusChangeNotifier) klass.getConstructor().newInstance();
520        } catch (ReflectiveOperationException e) {
521            throw new RuntimeException(e);
522        }
523        return notifier;
524    }
525
526    /**
527     * Executes something while setting a lock, retrying a few times if the lock is already set.
528     */
529    protected void atomic(String id, Consumer<KeyValueStore> consumer) {
530        KeyValueStore kv = getKeyValueStore();
531        String nodeid = Framework.getProperty("repository.clustering.id");
532        for (int i = 0; i < 5; i++) {
533            // the value of the lock is useful for debugging
534            String value = Instant.now() + " node=" + nodeid;
535            if (kv.compareAndSet(id + LOCK, null, value, WRITE_LOCK_TTL)) {
536                try {
537                    consumer.accept(kv);
538                    return;
539                } finally {
540                    kv.put(id + LOCK, (String) null);
541                }
542            }
543            try {
544                Thread.sleep((long) (Math.random() * 100 * i));
545            } catch (InterruptedException e) {
546                throw new RuntimeException(e);
547            }
548        }
549        String currentLock = kv.getString(id + LOCK);
550        throw new RuntimeException("Cannot lock for write migration: " + id + ", already locked: " + currentLock);
551    }
552
553}