summaryrefslogtreecommitdiffstats
path: root/mod/designtool/designtool-web/src/main/java/org/apache/nifi/nar/DCAEAutoLoader.java
diff options
context:
space:
mode:
Diffstat (limited to 'mod/designtool/designtool-web/src/main/java/org/apache/nifi/nar/DCAEAutoLoader.java')
-rw-r--r--mod/designtool/designtool-web/src/main/java/org/apache/nifi/nar/DCAEAutoLoader.java105
1 files changed, 105 insertions, 0 deletions
diff --git a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/nar/DCAEAutoLoader.java b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/nar/DCAEAutoLoader.java
new file mode 100644
index 0000000..ec15ba6
--- /dev/null
+++ b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/nar/DCAEAutoLoader.java
@@ -0,0 +1,105 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.apache.nifi.nar;
+
+import org.apache.nifi.bundle.Bundle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URL;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
+
+/**
+ * Uses the Java executor service scheduler to continuously load new DCAE jars
+ */
+public class DCAEAutoLoader {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DCAEAutoLoader.class);
+
+ private static final long POLL_INTERVAL_MS = 5000;
+
+ /**
+ * Runnable task that grabs list of remotely stored jars, identifies ones that haven't
+ * been processed, builds Nifi bundles for those unprocessed ones and loads them into
+ * the global extension manager.
+ */
+ private static class LoaderTask implements Runnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(LoaderTask.class);
+
+ private final URI indexJsonDcaeJars;
+ private final ExtensionDiscoveringManager extensionManager;
+ private final Set<URL> processed = new LinkedHashSet();
+
+ private LoaderTask(URI indexJsonDcaeJars, ExtensionDiscoveringManager extensionManager) {
+ this.indexJsonDcaeJars = indexJsonDcaeJars;
+ this.extensionManager = extensionManager;
+ }
+
+ @Override
+ public void run() {
+ try {
+ List<URL> toProcess = DCAEClassLoaders.getDCAEJarsURLs(this.indexJsonDcaeJars);
+ toProcess.removeAll(processed);
+
+ if (!toProcess.isEmpty()) {
+ Set<Bundle> bundles = DCAEClassLoaders.createDCAEBundles(toProcess);
+ this.extensionManager.discoverExtensions(bundles);
+ processed.addAll(toProcess);
+
+ LOGGER.info(String.format("#Added DCAE bundles: %d, #Total DCAE bundles: %d ",
+ bundles.size(), processed.size()));
+ }
+ } catch (final Exception e) {
+ LOGGER.error("Error loading DCAE jars due to: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+ private ScheduledFuture taskFuture;
+
+ public synchronized void start(URI indexJsonDcaeJars, final ExtensionDiscoveringManager extensionManager) {
+ // Restricting to a single thread
+ if (taskFuture != null && !taskFuture.isCancelled()) {
+ return;
+ }
+
+ LOGGER.info("Starting DCAE Auto-Loader: {}", new Object[]{indexJsonDcaeJars});
+
+ LoaderTask task = new LoaderTask(indexJsonDcaeJars, extensionManager);
+ this.taskFuture = executor.scheduleAtFixedRate(task, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ LOGGER.info("DCAE Auto-Loader started");
+ }
+
+ public synchronized void stop() {
+ if (this.taskFuture != null) {
+ this.taskFuture.cancel(true);
+ LOGGER.info("DCAE Auto-Loader stopped");
+ }
+ }
+
+}