From 2d1528b34c45a650e2407bbc90737cc4ca6ec042 Mon Sep 17 00:00:00 2001 From: Andrew Gauld Date: Wed, 26 Feb 2020 15:48:32 +0000 Subject: Link DCAE MOD design tool to Acumos Adapter Add import item to global menu with choices of import models, component specs, and data formats. Add prefix to acumos adapter API for consistency with rest of DCAE MOD and to allow adding acumos adapter to k8s/rancher ingress or other reverse proxy mechanism so it can be accessed by browser javascript in the design tool. Remove dcae mod hostname mechanism for callbacks from browser javascript since browser security only allows javascript to call back to the originating host port. Clean up indentation only differences between original NiFi code and ONAP versions. Remove 3 ONAP java files that are now identical to original NiFi versions. Change-Id: I23bbd98d5b171d624ef35088cd821aff2918fc1b Issue-ID: DCAEGEN2-1860 Signed-off-by: Andrew Gauld --- adapter/acumos/Changelog.md | 2 + adapter/acumos/aoconversion/index.html | 8 +- adapter/acumos/aoconversion/scanner.py | 10 +- adapter/acumos/pom.xml | 2 +- adapter/acumos/setup.py | 2 +- mod/designtool/designtool-web/pom.xml | 9 +- mod/designtool/designtool-web/sh/applypatches.sh | 13 +- .../src/main/java/org/apache/nifi/NiFi.java | 446 -- .../java/org/apache/nifi/util/NiFiProperties.java | 10 - .../apache/nifi/web/StandardNiFiServiceFacade.java | 4899 -------------------- .../org/apache/nifi/web/api/dto/DtoFactory.java | 156 +- .../nifi/web/api/dto/FlowConfigurationDTO.java | 182 - .../WEB-INF/partials/canvas/canvas-header.jsp | 72 +- .../webapp/WEB-INF/partials/canvas/navigation.jsp | 50 + .../src/main/webapp/css/navigation.css | 66 + .../src/main/webapp/fonts/flowfont/flowfont.eot | Bin 0 -> 11248 bytes .../src/main/webapp/fonts/flowfont/flowfont.svg | 70 + .../src/main/webapp/fonts/flowfont/flowfont.ttf | Bin 0 -> 11080 bytes .../src/main/webapp/fonts/flowfont/flowfont.woff | Bin 0 -> 11296 bytes .../src/main/webapp/fonts/flowfont/flowfont.woff2 | Bin 0 -> 5476 bytes .../src/main/webapp/js/jquery/dcae-mod.js | 167 +- .../nf-ng-canvas-global-menu-controller.js | 501 ++ .../src/main/webapp/js/nf/canvas/nf-settings.js | 23 +- mod/designtool/nifi-war-to-jar/pom.xml | 2 +- mod/designtool/pom.xml | 2 +- 25 files changed, 987 insertions(+), 5705 deletions(-) delete mode 100644 mod/designtool/designtool-web/src/main/java/org/apache/nifi/NiFi.java delete mode 100644 mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java delete mode 100644 mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/api/dto/FlowConfigurationDTO.java create mode 100644 mod/designtool/designtool-web/src/main/webapp/fonts/flowfont/flowfont.eot create mode 100644 mod/designtool/designtool-web/src/main/webapp/fonts/flowfont/flowfont.svg create mode 100644 mod/designtool/designtool-web/src/main/webapp/fonts/flowfont/flowfont.ttf create mode 100644 mod/designtool/designtool-web/src/main/webapp/fonts/flowfont/flowfont.woff create mode 100644 mod/designtool/designtool-web/src/main/webapp/fonts/flowfont/flowfont.woff2 create mode 100644 mod/designtool/designtool-web/src/main/webapp/js/nf/canvas/controllers/nf-ng-canvas-global-menu-controller.js diff --git a/adapter/acumos/Changelog.md b/adapter/acumos/Changelog.md index 0823a8e..e900ad7 100644 --- a/adapter/acumos/Changelog.md +++ b/adapter/acumos/Changelog.md @@ -4,5 +4,7 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## [1.0.1] - 3/20/2020 + * Adjust URL paths for consistency with DCAE GEN design tool ## [1.0.0] - 11/13/2019 * Onboard models from Acumos - initial version diff --git a/adapter/acumos/aoconversion/index.html b/adapter/acumos/aoconversion/index.html index c94621a..31f2372 100644 --- a/adapter/acumos/aoconversion/index.html +++ b/adapter/acumos/aoconversion/index.html @@ -156,7 +156,7 @@ function esc(s) { function onBoard() { fcomp("onboardingInProgress").style.display = "block"; - var url = "/onboard.js?acumos=" + uecvalue("furl"); + var url = "/acumos-adapter/onboard.js?acumos=" + uecvalue("furl"); if (cvalue("catMenu") != "*") { url += "&catalogId=" + uecvalue("catMenu"); if (cvalue("solMenu") != "*") { @@ -178,19 +178,19 @@ function chooseSolution() { if (cvalue("solMenu") == "*") { updatevis(); } else { - lookupItem("acRevs", "revMenu", "/listRevisions.js?acumos=" + uecvalue("furl") + "&solutionId=" + uecvalue("solMenu")); + lookupItem("acRevs", "revMenu", "/acumos-adapter/listRevisions.js?acumos=" + uecvalue("furl") + "&solutionId=" + uecvalue("solMenu")); } } function chooseCatalog() { if (cvalue("catMenu") == "*") { updatevis(); } else { - lookupItem("acSols", "solMenu", "/listSolutions.js?acumos=" + uecvalue("furl") + "&catalogId=" + uecvalue("catMenu")); + lookupItem("acSols", "solMenu", "/acumos-adapter/listSolutions.js?acumos=" + uecvalue("furl") + "&catalogId=" + uecvalue("catMenu")); } } function lookupCatalogs() { fcomp("onboard").style.display = "block"; - lookupItem("cAcumos", "catMenu", "/listCatalogs.js?acumos=" + uecvalue("furl")); + lookupItem("cAcumos", "catMenu", "/acumos-adapter/listCatalogs.js?acumos=" + uecvalue("furl")); } function lookupItem(dblock, smenu, url) { fcomp(dblock).style.display = "block"; diff --git a/adapter/acumos/aoconversion/scanner.py b/adapter/acumos/aoconversion/scanner.py index 22a5922..41f18de 100644 --- a/adapter/acumos/aoconversion/scanner.py +++ b/adapter/acumos/aoconversion/scanner.py @@ -251,23 +251,23 @@ class Apihandler(BaseHTTPRequestHandler): def do_GET(self): self.doqp() - if self.path == '/' or self.path == '/index.html': + if self.path == '/' or self.path == '/index.html' or self.path == '/acumos-adapter/' or self.path == '/acumos-adapter/index.html': self.replyraw(self.server.index, 'text/html') return if 'acumos' not in self.qparams: self.send_error(400) return aa = _AcumosAccess(self.server.config, self.qparams['acumos']) - if self.path == '/listCatalogs.js': + if self.path == '/acumos-adapter/listCatalogs.js': self.replyjson([{'name': x['name'], 'id': x['catalogId']} for x in aa.jsonget('/catalogs')]) return - if self.path == '/listSolutions.js': + if self.path == '/acumos-adapter/listSolutions.js': if 'catalogId' not in self.qparams: self.send_error(400) return self.replyjson([{'name': x['name'], 'id': x['solutionId']} for x in aa.jsonget('/solutions?catalogId={}', self.qparams['catalogId'])]) return - if self.path == '/listRevisions.js': + if self.path == '/acumos-adapter/listRevisions.js': if 'solutionId' not in self.qparams: self.send_error(400) return @@ -277,7 +277,7 @@ class Apihandler(BaseHTTPRequestHandler): def do_POST(self): self.doqp() - if self.path == '/onboard.js': + if self.path == '/acumos-adapter/onboard.js': if 'acumos' not in self.qparams: self.send_error(400) return diff --git a/adapter/acumos/pom.xml b/adapter/acumos/pom.xml index 8495ee6..4d5facd 100644 --- a/adapter/acumos/pom.xml +++ b/adapter/acumos/pom.xml @@ -23,7 +23,7 @@ limitations under the License. 4.0.0 org.onap.dcaegen2.platform.adapter dcaegen2-platform-adapter-acumos - 1.0.0 + 1.0.1 UTF-8 . diff --git a/adapter/acumos/setup.py b/adapter/acumos/setup.py index 1845b9e..9001cd5 100644 --- a/adapter/acumos/setup.py +++ b/adapter/acumos/setup.py @@ -20,7 +20,7 @@ from setuptools import setup, find_packages setup( name="aoconversion", - version="1.0.0", + version="1.0.1", packages=find_packages(exclude=["tests.*", "tests"]), author="Tommy Carpenter, Andrew Gauld", author_email="tommy@research.att.com, agauld@att.com", diff --git a/mod/designtool/designtool-web/pom.xml b/mod/designtool/designtool-web/pom.xml index 7cf0d8a..8806964 100644 --- a/mod/designtool/designtool-web/pom.xml +++ b/mod/designtool/designtool-web/pom.xml @@ -24,7 +24,7 @@ limitations under the License. org.onap.dcaegen2.platform.mod designtool - 1.0.0-SNAPSHOT + 1.0.1-SNAPSHOT designtool-web war @@ -147,6 +147,13 @@ limitations under the License. true true + + true + ${project.build.directory}/${project.build.finalName}//nf-ng-canvas-global-menu-controller-min.js + + ${staging.dir}/js/nf/canvas/controllers/nf-ng-canvas-global-menu-controller.js + + true ${project.build.directory}/${project.build.finalName}//nf-ng-breadcrumbs-controller-min.js diff --git a/mod/designtool/designtool-web/sh/applypatches.sh b/mod/designtool/designtool-web/sh/applypatches.sh index 47de89c..0b20969 100755 --- a/mod/designtool/designtool-web/sh/applypatches.sh +++ b/mod/designtool/designtool-web/sh/applypatches.sh @@ -36,14 +36,10 @@ jar xf $NIFI_BASE_DIR/nifi-current/lib/nifi-framework-nar-$NIFI_VERSION.nar \ # patch jar files cd $PATCHES/WEB-INF/classes set +f -jar uf $NIFI_BASE_DIR/nifi-toolkit-current/lib/nifi-client-dto-$NIFI_VERSION.jar \ - org/apache/nifi/web/api/dto/FlowConfigurationDTO*.class jar uf $TARGETS/META-INF/bundled-dependencies/nifi-jetty-$NIFI_VERSION.jar \ org/apache/nifi/web/server/JettyServer*.class jar uf $NIFI_BASE_DIR/nifi-current/lib/nifi-properties-$NIFI_VERSION.jar \ org/apache/nifi/util/NiFiProperties*.class -jar uf $NIFI_BASE_DIR/nifi-current/lib/nifi-runtime-$NIFI_VERSION.jar \ - org/apache/nifi/NiFi*.class jar uf $NIFI_BASE_DIR/nifi-toolkit-current/lib/nifi-framework-core-api-$NIFI_VERSION.jar \ org/apache/nifi/controller/AbstractPort*.class jar uf $TARGETS/META-INF/bundled-dependencies/nifi-framework-nar-loading-utils-$NIFI_VERSION.jar \ @@ -52,7 +48,6 @@ jar uf $TARGETS/META-INF/bundled-dependencies/nifi-framework-nar-loading-utils-$ # patch war files cd $PATCHES jar uf $TARGETS/META-INF/bundled-dependencies/nifi-web-api-$NIFI_VERSION.war \ - WEB-INF/classes/org/apache/nifi/web/StandardNiFiServiceFacade*.class \ WEB-INF/classes/org/apache/nifi/web/api/dto/DtoFactory*.class \ WEB-INF/classes/org/apache/nifi/web/dao/impl/StandardConnectionDAO*.class set -f @@ -73,6 +68,7 @@ sed -i \ -e '/nf.FlowVerison/{r nf-flow-version-min.js' -e 'd}' \ -e '/controllerConfig/{r nf-settings-min.js' -e 'd}' \ -e '/this.breadcrumbs/{r nf-ng-breadcrumbs-controller-min.js' -e 'd}' \ + -e '/Canvas.GlobalMenuCtrl=/{r nf-ng-canvas-global-menu-controller-min.js' -e 'd}' \ -e '/processor-types-table/{r nf-ng-processor-component-min.js' -e 'd}' \ js/nf/canvas/nf-canvas-all.js sed -i \ @@ -83,19 +79,14 @@ gzip -k \ js/nf/canvas/nf-canvas-all.js \ js/nf/summary/nf-summary-all.js jar uf $TARGETS/META-INF/bundled-dependencies/nifi-web-ui-$NIFI_VERSION.war \ - $(find WEB-INF/classes/org/apache/jsp/WEB_002dINF WEB-INF/pages WEB-INF/partials css js images -type f -print) + $(find WEB-INF/classes/org/apache/jsp/WEB_002dINF WEB-INF/pages WEB-INF/partials css js images fonts -type f -print) # patch scripts cp common.sh start.sh $NIFI_BASE_DIR/scripts/ # patch nar files cd $TARGETS -cp $NIFI_BASE_DIR/nifi-toolkit-current/lib/nifi-client-dto-$NIFI_VERSION.jar \ - META-INF/bundled-dependencies/nifi-client-dto-$NIFI_VERSION.jar -jar uf $NIFI_BASE_DIR/nifi-current/lib/nifi-site-to-site-reporting-nar-$NIFI_VERSION.nar \ - META-INF/bundled-dependencies/nifi-client-dto-$NIFI_VERSION.jar cp $NIFI_BASE_DIR/nifi-toolkit-current/lib/nifi-framework-core-api-$NIFI_VERSION.jar \ META-INF/bundled-dependencies/nifi-framework-core-api-$NIFI_VERSION.jar jar uf $NIFI_BASE_DIR/nifi-current/lib/nifi-framework-nar-$NIFI_VERSION.nar \ - META-INF/bundled-dependencies/nifi-client-dto-$NIFI_VERSION.jar \ META-INF/bundled-dependencies/nifi-framework-core-api-$NIFI_VERSION.jar \ META-INF/bundled-dependencies/nifi-framework-nar-loading-utils-$NIFI_VERSION.jar \ META-INF/bundled-dependencies/nifi-jetty-$NIFI_VERSION.jar \ diff --git a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/NiFi.java b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/NiFi.java deleted file mode 100644 index 0b033db..0000000 --- a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/NiFi.java +++ /dev/null @@ -1,446 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Modifications to the original nifi code for the ONAP project are made - * available under the Apache License, Version 2.0 - */ -package org.apache.nifi; - -import org.apache.nifi.bundle.Bundle; -import org.apache.nifi.nar.ExtensionMapping; -import org.apache.nifi.nar.NarClassLoaders; -import org.apache.nifi.nar.NarClassLoadersHolder; -import org.apache.nifi.nar.NarUnpacker; -import org.apache.nifi.nar.SystemBundle; -import org.apache.nifi.util.FileUtils; -import org.apache.nifi.util.NiFiProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.bridge.SLF4JBridgeHandler; - -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.lang.Thread.UncaughtExceptionHandler; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import java.util.Random; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -public class NiFi { - - private static final Logger LOGGER = LoggerFactory.getLogger(NiFi.class); - private static final String KEY_FILE_FLAG = "-K"; - private final NiFiServer nifiServer; - private final BootstrapListener bootstrapListener; - - public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port"; - private volatile boolean shutdown = false; - - public NiFi(final NiFiProperties properties) - throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { - - this(properties, ClassLoader.getSystemClassLoader()); - - } - - public NiFi(final NiFiProperties properties, ClassLoader rootClassLoader) - throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { - - // There can only be one krb5.conf for the overall Java process so set this globally during - // start up so that processors and our Kerberos authentication code don't have to set this - final File kerberosConfigFile = properties.getKerberosConfigurationFile(); - if (kerberosConfigFile != null) { - final String kerberosConfigFilePath = kerberosConfigFile.getAbsolutePath(); - LOGGER.info("Setting java.security.krb5.conf to {}", new Object[]{kerberosConfigFilePath}); - System.setProperty("java.security.krb5.conf", kerberosConfigFilePath); - } - - setDefaultUncaughtExceptionHandler(); - - // register the shutdown hook - addShutdownHook(); - - final String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY); - if (bootstrapPort != null) { - try { - final int port = Integer.parseInt(bootstrapPort); - - if (port < 1 || port > 65535) { - throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535"); - } - - bootstrapListener = new BootstrapListener(this, port); - bootstrapListener.start(); - } catch (final NumberFormatException nfe) { - throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535"); - } - } else { - LOGGER.info("NiFi started without Bootstrap Port information provided; will not listen for requests from Bootstrap"); - bootstrapListener = null; - } - - // delete the web working dir - if the application does not start successfully - // the web app directories might be in an invalid state. when this happens - // jetty will not attempt to re-extract the war into the directory. by removing - // the working directory, we can be assured that it will attempt to extract the - // war every time the application starts. - File webWorkingDir = properties.getWebWorkingDirectory(); - FileUtils.deleteFilesInDirectory(webWorkingDir, null, LOGGER, true, true); - FileUtils.deleteFile(webWorkingDir, LOGGER, 3); - - detectTimingIssues(); - - // redirect JUL log events - initLogging(); - - final Bundle systemBundle = SystemBundle.create(properties); - - // expand the nars - final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties, systemBundle); - - // load the extensions classloaders - NarClassLoaders narClassLoaders = NarClassLoadersHolder.getInstance(); - - narClassLoaders.init(rootClassLoader, - properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory()); - - // load the framework classloader - final ClassLoader frameworkClassLoader = narClassLoaders.getFrameworkBundle().getClassLoader(); - if (frameworkClassLoader == null) { - throw new IllegalStateException("Unable to find the framework NAR ClassLoader."); - } - - final Set narBundles = narClassLoaders.getBundles(); - - // load the server from the framework classloader - Thread.currentThread().setContextClassLoader(frameworkClassLoader); - Class jettyServer = Class.forName("org.apache.nifi.web.server.JettyServer", true, frameworkClassLoader); - Constructor jettyConstructor = jettyServer.getConstructor(NiFiProperties.class, Set.class); - - final long startTime = System.nanoTime(); - nifiServer = (NiFiServer) jettyConstructor.newInstance(properties, narBundles); - nifiServer.setExtensionMapping(extensionMapping); - nifiServer.setBundles(systemBundle, narBundles); - - if (shutdown) { - LOGGER.info("NiFi has been shutdown via NiFi Bootstrap. Will not start Controller"); - } else { - nifiServer.start(); - - if (bootstrapListener != null) { - bootstrapListener.sendStartedStatus(true); - } - - final long duration = System.nanoTime() - startTime; - LOGGER.info("Controller initialization took " + duration + " nanoseconds " - + "(" + (int) TimeUnit.SECONDS.convert(duration, TimeUnit.NANOSECONDS) + " seconds)."); - } - } - - protected void setDefaultUncaughtExceptionHandler() { - Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { - @Override - public void uncaughtException(final Thread t, final Throwable e) { - LOGGER.error("An Unknown Error Occurred in Thread {}: {}", t, e.toString()); - LOGGER.error("", e); - } - }); - } - - protected void addShutdownHook() { - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - // shutdown the jetty server - shutdownHook(); - } - })); - } - - protected void initLogging() { - SLF4JBridgeHandler.removeHandlersForRootLogger(); - SLF4JBridgeHandler.install(); - } - - private static ClassLoader createBootstrapClassLoader() { - //Get list of files in bootstrap folder - final List urls = new ArrayList<>(); - try { - Files.list(Paths.get("lib/bootstrap")).forEach(p -> { - try { - urls.add(p.toUri().toURL()); - } catch (final MalformedURLException mef) { - LOGGER.warn("Unable to load " + p.getFileName() + " due to " + mef, mef); - } - }); - } catch (IOException ioe) { - LOGGER.warn("Unable to access lib/bootstrap to create bootstrap classloader", ioe); - } - //Create the bootstrap classloader - return new URLClassLoader(urls.toArray(new URL[0]), Thread.currentThread().getContextClassLoader()); - } - - protected void shutdownHook() { - try { - shutdown(); - } catch (final Throwable t) { - LOGGER.warn("Problem occurred ensuring Jetty web server was properly terminated due to " + t); - } - } - - protected void shutdown() { - this.shutdown = true; - - LOGGER.info("Initiating shutdown of Jetty web server..."); - if (nifiServer != null) { - nifiServer.stop(); - } - if (bootstrapListener != null) { - bootstrapListener.stop(); - } - LOGGER.info("Jetty web server shutdown completed (nicely or otherwise)."); - } - - /** - * Determine if the machine we're running on has timing issues. - */ - private void detectTimingIssues() { - final int minRequiredOccurrences = 25; - final int maxOccurrencesOutOfRange = 15; - final AtomicLong lastTriggerMillis = new AtomicLong(System.currentTimeMillis()); - - final ScheduledExecutorService service = Executors.newScheduledThreadPool(1, new ThreadFactory() { - private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); - - @Override - public Thread newThread(final Runnable r) { - final Thread t = defaultFactory.newThread(r); - t.setDaemon(true); - t.setName("Detect Timing Issues"); - return t; - } - }); - - final AtomicInteger occurrencesOutOfRange = new AtomicInteger(0); - final AtomicInteger occurrences = new AtomicInteger(0); - final Runnable command = new Runnable() { - @Override - public void run() { - final long curMillis = System.currentTimeMillis(); - final long difference = curMillis - lastTriggerMillis.get(); - final long millisOff = Math.abs(difference - 2000L); - occurrences.incrementAndGet(); - if (millisOff > 500L) { - occurrencesOutOfRange.incrementAndGet(); - } - lastTriggerMillis.set(curMillis); - } - }; - - final ScheduledFuture future = service.scheduleWithFixedDelay(command, 2000L, 2000L, TimeUnit.MILLISECONDS); - - final TimerTask timerTask = new TimerTask() { - @Override - public void run() { - future.cancel(true); - service.shutdownNow(); - - if (occurrences.get() < minRequiredOccurrences || occurrencesOutOfRange.get() > maxOccurrencesOutOfRange) { - LOGGER.warn("NiFi has detected that this box is not responding within the expected timing interval, which may cause " - + "Processors to be scheduled erratically. Please see the NiFi documentation for more information."); - } - } - }; - final Timer timer = new Timer(true); - timer.schedule(timerTask, 60000L); - } - - /** - * Main entry point of the application. - * - * @param args things which are ignored - */ - public static void main(String[] args) { - LOGGER.info("Launching NiFi..."); - try { - NiFiProperties properties = convertArgumentsToValidatedNiFiProperties(args); - new NiFi(properties); - } catch (final Throwable t) { - LOGGER.error("Failure to launch NiFi due to " + t, t); - } - } - - protected static NiFiProperties convertArgumentsToValidatedNiFiProperties(String[] args) { - final ClassLoader bootstrap = createBootstrapClassLoader(); - NiFiProperties properties = initializeProperties(args, bootstrap); - properties.validate(); - return properties; - } - - private static NiFiProperties initializeProperties(final String[] args, final ClassLoader boostrapLoader) { - // Try to get key - // If key doesn't exist, instantiate without - // Load properties - // If properties are protected and key missing, throw RuntimeException - - final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); - final String key; - try { - key = loadFormattedKey(args); - // The key might be empty or null when it is passed to the loader - } catch (IllegalArgumentException e) { - final String msg = "The bootstrap process did not provide a valid key"; - throw new IllegalArgumentException(msg, e); - } - Thread.currentThread().setContextClassLoader(boostrapLoader); - - try { - final Class propsLoaderClass = Class.forName("org.apache.nifi.properties.NiFiPropertiesLoader", true, boostrapLoader); - final Method withKeyMethod = propsLoaderClass.getMethod("withKey", String.class); - final Object loaderInstance = withKeyMethod.invoke(null, key); - final Method getMethod = propsLoaderClass.getMethod("get"); - final NiFiProperties properties = (NiFiProperties) getMethod.invoke(loaderInstance); - LOGGER.info("Loaded {} properties", properties.size()); - return properties; - } catch (InvocationTargetException wrappedException) { - final String msg = "There was an issue decrypting protected properties"; - throw new IllegalArgumentException(msg, wrappedException.getCause() == null ? wrappedException : wrappedException.getCause()); - } catch (final IllegalAccessException | NoSuchMethodException | ClassNotFoundException reex) { - final String msg = "Unable to access properties loader in the expected manner - apparent classpath or build issue"; - throw new IllegalArgumentException(msg, reex); - } catch (final RuntimeException e) { - final String msg = "There was an issue decrypting protected properties"; - throw new IllegalArgumentException(msg, e); - } finally { - Thread.currentThread().setContextClassLoader(contextClassLoader); - } - } - - private static String loadFormattedKey(String[] args) { - String key = null; - List parsedArgs = parseArgs(args); - // Check if args contain protection key - if (parsedArgs.contains(KEY_FILE_FLAG)) { - key = getKeyFromKeyFileAndPrune(parsedArgs); - // Format the key (check hex validity and remove spaces) - key = formatHexKey(key); - - } - - if (null == key) { - return ""; - } else if (!isHexKeyValid(key)) { - throw new IllegalArgumentException("The key was not provided in valid hex format and of the correct length"); - } else { - return key; - } - } - - private static String getKeyFromKeyFileAndPrune(List parsedArgs) { - String key = null; - LOGGER.debug("The bootstrap process provided the " + KEY_FILE_FLAG + " flag"); - int i = parsedArgs.indexOf(KEY_FILE_FLAG); - if (parsedArgs.size() <= i + 1) { - LOGGER.error("The bootstrap process passed the {} flag without a filename", KEY_FILE_FLAG); - throw new IllegalArgumentException("The bootstrap process provided the " + KEY_FILE_FLAG + " flag but no key"); - } - try { - String passwordfile_path = parsedArgs.get(i + 1); - // Slurp in the contents of the file: - byte[] encoded = Files.readAllBytes(Paths.get(passwordfile_path)); - key = new String(encoded,StandardCharsets.UTF_8); - if (0 == key.length()) - throw new IllegalArgumentException("Key in keyfile " + passwordfile_path + " yielded an empty key"); - - LOGGER.info("Now overwriting file in "+passwordfile_path); - - // Overwrite the contents of the file (to avoid littering file system - // unlinked with key material): - File password_file = new File(passwordfile_path); - FileWriter overwriter = new FileWriter(password_file,false); - - // Construe a random pad: - Random r = new Random(); - StringBuffer sb = new StringBuffer(); - // Note on correctness: this pad is longer, but equally sufficient. - while(sb.length() < encoded.length){ - sb.append(Integer.toHexString(r.nextInt())); - } - String pad = sb.toString(); - LOGGER.info("Overwriting key material with pad: "+pad); - overwriter.write(pad); - overwriter.close(); - - LOGGER.info("Removing/unlinking file: "+passwordfile_path); - password_file.delete(); - - } catch (IOException e) { - LOGGER.error("Caught IOException while retrieving the "+KEY_FILE_FLAG+"-passed keyfile; aborting: "+e.toString()); - System.exit(1); - } - - LOGGER.info("Read property protection key from key file provided by bootstrap process"); - return key; - } - - private static List parseArgs(String[] args) { - List parsedArgs = new ArrayList<>(Arrays.asList(args)); - for (int i = 0; i < parsedArgs.size(); i++) { - if (parsedArgs.get(i).startsWith(KEY_FILE_FLAG + " ")) { - String[] split = parsedArgs.get(i).split(" ", 2); - parsedArgs.set(i, split[0]); - parsedArgs.add(i + 1, split[1]); - break; - } - } - return parsedArgs; - } - - private static String formatHexKey(String input) { - if (input == null || input.trim().isEmpty()) { - return ""; - } - return input.replaceAll("[^0-9a-fA-F]", "").toLowerCase(); - } - - private static boolean isHexKeyValid(String key) { - if (key == null || key.trim().isEmpty()) { - return false; - } - // Key length is in "nibbles" (i.e. one hex char = 4 bits) - return Arrays.asList(128, 196, 256).contains(key.length() * 4) && key.matches("^[0-9a-fA-F]*$"); - } -} diff --git a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/util/NiFiProperties.java b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/util/NiFiProperties.java index 3b341ec..b1b41c1 100644 --- a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -192,7 +192,6 @@ public abstract class NiFiProperties { // ui properties public static final String UI_BANNER_TEXT = "nifi.ui.banner.text"; public static final String UI_AUTO_REFRESH_INTERVAL = "nifi.ui.autorefresh.interval"; - public static final String UI_DCAE_DISTRIBUTOR_API_URL="nifi.ui.dcae.distibutor.api.url"; // cluster common properties public static final String CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "nifi.cluster.protocol.heartbeat.interval"; @@ -717,15 +716,6 @@ public abstract class NiFiProperties { return this.getProperty(UI_BANNER_TEXT, StringUtils.EMPTY); } - - /** - * @author Renu - * @return the IP address where the nifi-app is being hosted - */ - public String getDcaeDistributorApiHostname() { - return getProperty(UI_DCAE_DISTRIBUTOR_API_URL); - } - /** * Returns the auto refresh interval in seconds. * diff --git a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java deleted file mode 100644 index 8ad05bd..0000000 --- a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ /dev/null @@ -1,4899 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Modifications to the original nifi code for the ONAP project are made - * available under the Apache License, Version 2.0 - */ -package org.apache.nifi.web; - -import com.google.common.collect.Sets; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.nifi.action.Action; -import org.apache.nifi.action.Component; -import org.apache.nifi.action.FlowChangeAction; -import org.apache.nifi.action.Operation; -import org.apache.nifi.action.details.FlowChangePurgeDetails; -import org.apache.nifi.admin.service.AuditService; -import org.apache.nifi.authorization.AccessDeniedException; -import org.apache.nifi.authorization.AccessPolicy; -import org.apache.nifi.authorization.AuthorizableLookup; -import org.apache.nifi.authorization.AuthorizationRequest; -import org.apache.nifi.authorization.AuthorizationResult; -import org.apache.nifi.authorization.AuthorizationResult.Result; -import org.apache.nifi.authorization.AuthorizeAccess; -import org.apache.nifi.authorization.Authorizer; -import org.apache.nifi.authorization.Group; -import org.apache.nifi.authorization.RequestAction; -import org.apache.nifi.authorization.Resource; -import org.apache.nifi.authorization.User; -import org.apache.nifi.authorization.UserContextKeys; -import org.apache.nifi.authorization.resource.Authorizable; -import org.apache.nifi.authorization.resource.EnforcePolicyPermissionsThroughBaseResource; -import org.apache.nifi.authorization.resource.OperationAuthorizable; -import org.apache.nifi.authorization.resource.ResourceFactory; -import org.apache.nifi.authorization.user.NiFiUser; -import org.apache.nifi.authorization.user.NiFiUserUtils; -import org.apache.nifi.bundle.BundleCoordinate; -import org.apache.nifi.cluster.coordination.ClusterCoordinator; -import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor; -import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat; -import org.apache.nifi.cluster.coordination.node.ClusterRoles; -import org.apache.nifi.cluster.coordination.node.DisconnectionCode; -import org.apache.nifi.cluster.coordination.node.NodeConnectionState; -import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; -import org.apache.nifi.cluster.coordination.node.OffloadCode; -import org.apache.nifi.cluster.event.NodeEvent; -import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException; -import org.apache.nifi.cluster.manager.exception.UnknownNodeException; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.components.ConfigurableComponent; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.RequiredPermission; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; -import org.apache.nifi.components.state.Scope; -import org.apache.nifi.components.state.StateMap; -import org.apache.nifi.connectable.Connectable; -import org.apache.nifi.connectable.Connection; -import org.apache.nifi.connectable.Funnel; -import org.apache.nifi.connectable.Port; -import org.apache.nifi.controller.ComponentNode; -import org.apache.nifi.controller.Counter; -import org.apache.nifi.controller.FlowController; -import org.apache.nifi.controller.ProcessorNode; -import org.apache.nifi.controller.ReportingTaskNode; -import org.apache.nifi.controller.ScheduledState; -import org.apache.nifi.controller.Snippet; -import org.apache.nifi.controller.Template; -import org.apache.nifi.controller.label.Label; -import org.apache.nifi.controller.leader.election.LeaderElectionManager; -import org.apache.nifi.controller.repository.claim.ContentDirection; -import org.apache.nifi.controller.service.ControllerServiceNode; -import org.apache.nifi.controller.service.ControllerServiceReference; -import org.apache.nifi.controller.service.ControllerServiceState; -import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.controller.status.ProcessorStatus; -import org.apache.nifi.diagnostics.SystemDiagnostics; -import org.apache.nifi.events.BulletinFactory; -import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.groups.ProcessGroupCounts; -import org.apache.nifi.groups.RemoteProcessGroup; -import org.apache.nifi.history.History; -import org.apache.nifi.history.HistoryQuery; -import org.apache.nifi.history.PreviousValue; -import org.apache.nifi.registry.ComponentVariableRegistry; -import org.apache.nifi.registry.authorization.Permissions; -import org.apache.nifi.registry.bucket.Bucket; -import org.apache.nifi.registry.client.NiFiRegistryException; -import org.apache.nifi.registry.flow.FlowRegistry; -import org.apache.nifi.registry.flow.FlowRegistryClient; -import org.apache.nifi.registry.flow.VersionControlInformation; -import org.apache.nifi.registry.flow.VersionedComponent; -import org.apache.nifi.registry.flow.VersionedConnection; -import org.apache.nifi.registry.flow.VersionedFlow; -import org.apache.nifi.registry.flow.VersionedFlowCoordinates; -import org.apache.nifi.registry.flow.VersionedFlowSnapshot; -import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; -import org.apache.nifi.registry.flow.VersionedFlowState; -import org.apache.nifi.registry.flow.VersionedProcessGroup; -import org.apache.nifi.registry.flow.diff.ComparableDataFlow; -import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor; -import org.apache.nifi.registry.flow.diff.DifferenceType; -import org.apache.nifi.registry.flow.diff.FlowComparator; -import org.apache.nifi.registry.flow.diff.FlowComparison; -import org.apache.nifi.registry.flow.diff.FlowDifference; -import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow; -import org.apache.nifi.registry.flow.diff.StandardFlowComparator; -import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor; -import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent; -import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService; -import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup; -import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor; -import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPort; -import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; -import org.apache.nifi.remote.RemoteGroupPort; -import org.apache.nifi.remote.RootGroupPort; -import org.apache.nifi.reporting.Bulletin; -import org.apache.nifi.reporting.BulletinQuery; -import org.apache.nifi.reporting.BulletinRepository; -import org.apache.nifi.reporting.ComponentType; -import org.apache.nifi.util.BundleUtils; -import org.apache.nifi.util.FlowDifferenceFilters; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.api.dto.AccessPolicyDTO; -import org.apache.nifi.web.api.dto.AccessPolicySummaryDTO; -import org.apache.nifi.web.api.dto.AffectedComponentDTO; -import org.apache.nifi.web.api.dto.BucketDTO; -import org.apache.nifi.web.api.dto.BulletinBoardDTO; -import org.apache.nifi.web.api.dto.BulletinDTO; -import org.apache.nifi.web.api.dto.BulletinQueryDTO; -import org.apache.nifi.web.api.dto.BundleDTO; -import org.apache.nifi.web.api.dto.ClusterDTO; -import org.apache.nifi.web.api.dto.ComponentDTO; -import org.apache.nifi.web.api.dto.ComponentDifferenceDTO; -import org.apache.nifi.web.api.dto.ComponentHistoryDTO; -import org.apache.nifi.web.api.dto.ComponentReferenceDTO; -import org.apache.nifi.web.api.dto.ComponentRestrictionPermissionDTO; -import org.apache.nifi.web.api.dto.ComponentStateDTO; -import org.apache.nifi.web.api.dto.ConnectionDTO; -import org.apache.nifi.web.api.dto.ControllerConfigurationDTO; -import org.apache.nifi.web.api.dto.ControllerDTO; -import org.apache.nifi.web.api.dto.ControllerServiceDTO; -import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; -import org.apache.nifi.web.api.dto.CounterDTO; -import org.apache.nifi.web.api.dto.CountersDTO; -import org.apache.nifi.web.api.dto.CountersSnapshotDTO; -import org.apache.nifi.web.api.dto.DocumentedTypeDTO; -import org.apache.nifi.web.api.dto.DropRequestDTO; -import org.apache.nifi.web.api.dto.DtoFactory; -import org.apache.nifi.web.api.dto.EntityFactory; -import org.apache.nifi.web.api.dto.FlowConfigurationDTO; -import org.apache.nifi.web.api.dto.FlowFileDTO; -import org.apache.nifi.web.api.dto.FlowSnippetDTO; -import org.apache.nifi.web.api.dto.FunnelDTO; -import org.apache.nifi.web.api.dto.LabelDTO; -import org.apache.nifi.web.api.dto.ListingRequestDTO; -import org.apache.nifi.web.api.dto.NodeDTO; -import org.apache.nifi.web.api.dto.PermissionsDTO; -import org.apache.nifi.web.api.dto.PortDTO; -import org.apache.nifi.web.api.dto.PreviousValueDTO; -import org.apache.nifi.web.api.dto.ProcessGroupDTO; -import org.apache.nifi.web.api.dto.ProcessorConfigDTO; -import org.apache.nifi.web.api.dto.ProcessorDTO; -import org.apache.nifi.web.api.dto.PropertyDescriptorDTO; -import org.apache.nifi.web.api.dto.PropertyHistoryDTO; -import org.apache.nifi.web.api.dto.RegistryDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; -import org.apache.nifi.web.api.dto.ReportingTaskDTO; -import org.apache.nifi.web.api.dto.RequiredPermissionDTO; -import org.apache.nifi.web.api.dto.ResourceDTO; -import org.apache.nifi.web.api.dto.RevisionDTO; -import org.apache.nifi.web.api.dto.SnippetDTO; -import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO; -import org.apache.nifi.web.api.dto.TemplateDTO; -import org.apache.nifi.web.api.dto.UserDTO; -import org.apache.nifi.web.api.dto.UserGroupDTO; -import org.apache.nifi.web.api.dto.VariableRegistryDTO; -import org.apache.nifi.web.api.dto.VersionControlInformationDTO; -import org.apache.nifi.web.api.dto.VersionedFlowDTO; -import org.apache.nifi.web.api.dto.action.HistoryDTO; -import org.apache.nifi.web.api.dto.action.HistoryQueryDTO; -import org.apache.nifi.web.api.dto.diagnostics.ConnectionDiagnosticsDTO; -import org.apache.nifi.web.api.dto.diagnostics.ControllerServiceDiagnosticsDTO; -import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsDTO; -import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsSnapshotDTO; -import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO; -import org.apache.nifi.web.api.dto.flow.FlowDTO; -import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO; -import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; -import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO; -import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO; -import org.apache.nifi.web.api.dto.search.SearchResultsDTO; -import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; -import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; -import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO; -import org.apache.nifi.web.api.dto.status.PortStatusDTO; -import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; -import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO; -import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; -import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; -import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; -import org.apache.nifi.web.api.entity.AccessPolicyEntity; -import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity; -import org.apache.nifi.web.api.entity.ActionEntity; -import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity; -import org.apache.nifi.web.api.entity.AffectedComponentEntity; -import org.apache.nifi.web.api.entity.BucketEntity; -import org.apache.nifi.web.api.entity.BulletinEntity; -import org.apache.nifi.web.api.entity.ComponentReferenceEntity; -import org.apache.nifi.web.api.entity.ConnectionEntity; -import org.apache.nifi.web.api.entity.ConnectionStatusEntity; -import org.apache.nifi.web.api.entity.ControllerBulletinsEntity; -import org.apache.nifi.web.api.entity.ControllerConfigurationEntity; -import org.apache.nifi.web.api.entity.ControllerServiceEntity; -import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity; -import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; -import org.apache.nifi.web.api.entity.CurrentUserEntity; -import org.apache.nifi.web.api.entity.FlowComparisonEntity; -import org.apache.nifi.web.api.entity.FlowConfigurationEntity; -import org.apache.nifi.web.api.entity.FlowEntity; -import org.apache.nifi.web.api.entity.FunnelEntity; -import org.apache.nifi.web.api.entity.LabelEntity; -import org.apache.nifi.web.api.entity.PortEntity; -import org.apache.nifi.web.api.entity.PortStatusEntity; -import org.apache.nifi.web.api.entity.ProcessGroupEntity; -import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity; -import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; -import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity; -import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity; -import org.apache.nifi.web.api.entity.ProcessorEntity; -import org.apache.nifi.web.api.entity.ProcessorStatusEntity; -import org.apache.nifi.web.api.entity.RegistryClientEntity; -import org.apache.nifi.web.api.entity.RegistryEntity; -import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; -import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity; -import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity; -import org.apache.nifi.web.api.entity.ReportingTaskEntity; -import org.apache.nifi.web.api.entity.ScheduleComponentsEntity; -import org.apache.nifi.web.api.entity.SnippetEntity; -import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity; -import org.apache.nifi.web.api.entity.StatusHistoryEntity; -import org.apache.nifi.web.api.entity.TemplateEntity; -import org.apache.nifi.web.api.entity.TenantEntity; -import org.apache.nifi.web.api.entity.UserEntity; -import org.apache.nifi.web.api.entity.UserGroupEntity; -import org.apache.nifi.web.api.entity.VariableEntity; -import org.apache.nifi.web.api.entity.VariableRegistryEntity; -import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity; -import org.apache.nifi.web.api.entity.VersionControlInformationEntity; -import org.apache.nifi.web.api.entity.VersionedFlowEntity; -import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity; -import org.apache.nifi.web.controller.ControllerFacade; -import org.apache.nifi.web.dao.AccessPolicyDAO; -import org.apache.nifi.web.dao.ConnectionDAO; -import org.apache.nifi.web.dao.ControllerServiceDAO; -import org.apache.nifi.web.dao.FunnelDAO; -import org.apache.nifi.web.dao.LabelDAO; -import org.apache.nifi.web.dao.PortDAO; -import org.apache.nifi.web.dao.ProcessGroupDAO; -import org.apache.nifi.web.dao.ProcessorDAO; -import org.apache.nifi.web.dao.RegistryDAO; -import org.apache.nifi.web.dao.RemoteProcessGroupDAO; -import org.apache.nifi.web.dao.ReportingTaskDAO; -import org.apache.nifi.web.dao.SnippetDAO; -import org.apache.nifi.web.dao.TemplateDAO; -import org.apache.nifi.web.dao.UserDAO; -import org.apache.nifi.web.dao.UserGroupDAO; -import org.apache.nifi.web.revision.DeleteRevisionTask; -import org.apache.nifi.web.revision.ExpiredRevisionClaimException; -import org.apache.nifi.web.revision.RevisionClaim; -import org.apache.nifi.web.revision.RevisionManager; -import org.apache.nifi.web.revision.RevisionUpdate; -import org.apache.nifi.web.revision.StandardRevisionClaim; -import org.apache.nifi.web.revision.StandardRevisionUpdate; -import org.apache.nifi.web.revision.UpdateRevisionTask; -import org.apache.nifi.web.util.SnippetUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Response; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** - * Implementation of NiFiServiceFacade that performs revision checking. - */ -public class StandardNiFiServiceFacade implements NiFiServiceFacade { - private static final Logger logger = LoggerFactory.getLogger(StandardNiFiServiceFacade.class); - private static final int VALIDATION_WAIT_MILLIS = 50; - - // nifi core components - private ControllerFacade controllerFacade; - private SnippetUtils snippetUtils; - - // revision manager - private RevisionManager revisionManager; - private BulletinRepository bulletinRepository; - - // data access objects - private ProcessorDAO processorDAO; - private ProcessGroupDAO processGroupDAO; - private RemoteProcessGroupDAO remoteProcessGroupDAO; - private LabelDAO labelDAO; - private FunnelDAO funnelDAO; - private SnippetDAO snippetDAO; - private PortDAO inputPortDAO; - private PortDAO outputPortDAO; - private ConnectionDAO connectionDAO; - private ControllerServiceDAO controllerServiceDAO; - private ReportingTaskDAO reportingTaskDAO; - private TemplateDAO templateDAO; - private UserDAO userDAO; - private UserGroupDAO userGroupDAO; - private AccessPolicyDAO accessPolicyDAO; - private RegistryDAO registryDAO; - private ClusterCoordinator clusterCoordinator; - private HeartbeatMonitor heartbeatMonitor; - private LeaderElectionManager leaderElectionManager; - - // administrative services - private AuditService auditService; - - // flow registry - private FlowRegistryClient flowRegistryClient; - - // properties - private NiFiProperties properties; - private DtoFactory dtoFactory; - private EntityFactory entityFactory; - - private Authorizer authorizer; - - private AuthorizableLookup authorizableLookup; - - // ----------------------------------------- - // Synchronization methods - // ----------------------------------------- - @Override - public void authorizeAccess(final AuthorizeAccess authorizeAccess) { - authorizeAccess.authorize(authorizableLookup); - } - - @Override - public void verifyRevision(final Revision revision, final NiFiUser user) { - final Revision curRevision = revisionManager.getRevision(revision.getComponentId()); - if (revision.equals(curRevision)) { - return; - } - - throw new InvalidRevisionException(revision + " is not the most up-to-date revision. This component appears to have been modified"); - } - - @Override - public void verifyRevisions(final Set revisions, final NiFiUser user) { - for (final Revision revision : revisions) { - verifyRevision(revision, user); - } - } - - @Override - public Set getRevisionsFromGroup(final String groupId, final Function> getComponents) { - final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); - final Set componentIds = getComponents.apply(group); - return componentIds.stream().map(id -> revisionManager.getRevision(id)).collect(Collectors.toSet()); - } - - @Override - public Set getRevisionsFromSnippet(final String snippetId) { - final Snippet snippet = snippetDAO.getSnippet(snippetId); - final Set componentIds = new HashSet<>(); - componentIds.addAll(snippet.getProcessors().keySet()); - componentIds.addAll(snippet.getFunnels().keySet()); - componentIds.addAll(snippet.getLabels().keySet()); - componentIds.addAll(snippet.getConnections().keySet()); - componentIds.addAll(snippet.getInputPorts().keySet()); - componentIds.addAll(snippet.getOutputPorts().keySet()); - componentIds.addAll(snippet.getProcessGroups().keySet()); - componentIds.addAll(snippet.getRemoteProcessGroups().keySet()); - return componentIds.stream().map(id -> revisionManager.getRevision(id)).collect(Collectors.toSet()); - } - - // ----------------------------------------- - // Verification Operations - // ----------------------------------------- - - @Override - public void verifyListQueue(final String connectionId) { - connectionDAO.verifyList(connectionId); - } - - @Override - public void verifyCreateConnection(final String groupId, final ConnectionDTO connectionDTO) { - connectionDAO.verifyCreate(groupId, connectionDTO); - } - - @Override - public void verifyUpdateConnection(final ConnectionDTO connectionDTO) { - // if connection does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (connectionDAO.hasConnection(connectionDTO.getId())) { - connectionDAO.verifyUpdate(connectionDTO); - } else { - connectionDAO.verifyCreate(connectionDTO.getParentGroupId(), connectionDTO); - } - } - - @Override - public void verifyDeleteConnection(final String connectionId) { - connectionDAO.verifyDelete(connectionId); - } - - @Override - public void verifyDeleteFunnel(final String funnelId) { - funnelDAO.verifyDelete(funnelId); - } - - @Override - public void verifyUpdateInputPort(final PortDTO inputPortDTO) { - // if connection does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (inputPortDAO.hasPort(inputPortDTO.getId())) { - inputPortDAO.verifyUpdate(inputPortDTO); - } - } - - @Override - public void verifyDeleteInputPort(final String inputPortId) { - inputPortDAO.verifyDelete(inputPortId); - } - - @Override - public void verifyUpdateOutputPort(final PortDTO outputPortDTO) { - // if connection does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (outputPortDAO.hasPort(outputPortDTO.getId())) { - outputPortDAO.verifyUpdate(outputPortDTO); - } - } - - @Override - public void verifyDeleteOutputPort(final String outputPortId) { - outputPortDAO.verifyDelete(outputPortId); - } - - @Override - public void verifyCreateProcessor(ProcessorDTO processorDTO) { - processorDAO.verifyCreate(processorDTO); - } - - @Override - public void verifyUpdateProcessor(final ProcessorDTO processorDTO) { - // if group does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (processorDAO.hasProcessor(processorDTO.getId())) { - processorDAO.verifyUpdate(processorDTO); - } else { - verifyCreateProcessor(processorDTO); - } - } - - @Override - public void verifyDeleteProcessor(final String processorId) { - processorDAO.verifyDelete(processorId); - } - - @Override - public void verifyScheduleComponents(final String groupId, final ScheduledState state, final Set componentIds) { - processGroupDAO.verifyScheduleComponents(groupId, state, componentIds); - } - - @Override - public void verifyEnableComponents(String processGroupId, ScheduledState state, Set componentIds) { - processGroupDAO.verifyEnableComponents(processGroupId, state, componentIds); - } - - @Override - public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Collection serviceIds) { - processGroupDAO.verifyActivateControllerServices(state, serviceIds); - } - - @Override - public void verifyDeleteProcessGroup(final String groupId) { - processGroupDAO.verifyDelete(groupId); - } - - @Override - public void verifyUpdateRemoteProcessGroup(final RemoteProcessGroupDTO remoteProcessGroupDTO) { - // if remote group does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (remoteProcessGroupDAO.hasRemoteProcessGroup(remoteProcessGroupDTO.getId())) { - remoteProcessGroupDAO.verifyUpdate(remoteProcessGroupDTO); - } - } - - @Override - public void verifyUpdateRemoteProcessGroupInputPort(final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { - remoteProcessGroupDAO.verifyUpdateInputPort(remoteProcessGroupId, remoteProcessGroupPortDTO); - } - - @Override - public void verifyUpdateRemoteProcessGroupOutputPort(final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { - remoteProcessGroupDAO.verifyUpdateOutputPort(remoteProcessGroupId, remoteProcessGroupPortDTO); - } - - @Override - public void verifyDeleteRemoteProcessGroup(final String remoteProcessGroupId) { - remoteProcessGroupDAO.verifyDelete(remoteProcessGroupId); - } - - @Override - public void verifyCreateControllerService(ControllerServiceDTO controllerServiceDTO) { - controllerServiceDAO.verifyCreate(controllerServiceDTO); - } - - @Override - public void verifyUpdateControllerService(final ControllerServiceDTO controllerServiceDTO) { - // if service does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (controllerServiceDAO.hasControllerService(controllerServiceDTO.getId())) { - controllerServiceDAO.verifyUpdate(controllerServiceDTO); - } else { - verifyCreateControllerService(controllerServiceDTO); - } - } - - @Override - public void verifyUpdateControllerServiceReferencingComponents(final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) { - controllerServiceDAO.verifyUpdateReferencingComponents(controllerServiceId, scheduledState, controllerServiceState); - } - - @Override - public void verifyDeleteControllerService(final String controllerServiceId) { - controllerServiceDAO.verifyDelete(controllerServiceId); - } - - @Override - public void verifyCreateReportingTask(ReportingTaskDTO reportingTaskDTO) { - reportingTaskDAO.verifyCreate(reportingTaskDTO); - } - - @Override - public void verifyUpdateReportingTask(final ReportingTaskDTO reportingTaskDTO) { - // if tasks does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (reportingTaskDAO.hasReportingTask(reportingTaskDTO.getId())) { - reportingTaskDAO.verifyUpdate(reportingTaskDTO); - } else { - verifyCreateReportingTask(reportingTaskDTO); - } - } - - @Override - public void verifyDeleteReportingTask(final String reportingTaskId) { - reportingTaskDAO.verifyDelete(reportingTaskId); - } - - // ----------------------------------------- - // Write Operations - // ----------------------------------------- - - @Override - public AccessPolicyEntity updateAccessPolicy(final Revision revision, final AccessPolicyDTO accessPolicyDTO) { - final Authorizable authorizable = authorizableLookup.getAccessPolicyById(accessPolicyDTO.getId()); - final RevisionUpdate snapshot = updateComponent(revision, - authorizable, - () -> accessPolicyDAO.updateAccessPolicy(accessPolicyDTO), - accessPolicy -> { - final Set users = accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()); - final Set userGroups = accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()); - final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource()); - return dtoFactory.createAccessPolicyDto(accessPolicy, userGroups, users, componentReference); - }); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizable); - return entityFactory.createAccessPolicyEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); - } - - @Override - public UserEntity updateUser(final Revision revision, final UserDTO userDTO) { - final Authorizable usersAuthorizable = authorizableLookup.getTenant(); - final Set groups = userGroupDAO.getUserGroupsForUser(userDTO.getId()); - final Set policies = userGroupDAO.getAccessPoliciesForUser(userDTO.getId()); - final RevisionUpdate snapshot = updateComponent(revision, - usersAuthorizable, - () -> userDAO.updateUser(userDTO), - user -> { - final Set tenantEntities = groups.stream().map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()); - final Set policyEntities = policies.stream().map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); - return dtoFactory.createUserDto(user, tenantEntities, policyEntities); - }); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(usersAuthorizable); - return entityFactory.createUserEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); - } - - @Override - public UserGroupEntity updateUserGroup(final Revision revision, final UserGroupDTO userGroupDTO) { - final Authorizable userGroupsAuthorizable = authorizableLookup.getTenant(); - final Set policies = userGroupDAO.getAccessPoliciesForUserGroup(userGroupDTO.getId()); - final RevisionUpdate snapshot = updateComponent(revision, - userGroupsAuthorizable, - () -> userGroupDAO.updateUserGroup(userGroupDTO), - userGroup -> { - final Set tenantEntities = userGroup.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()); - final Set policyEntities = policies.stream().map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); - return dtoFactory.createUserGroupDto(userGroup, tenantEntities, policyEntities); - } - ); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(userGroupsAuthorizable); - return entityFactory.createUserGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); - } - - @Override - public ConnectionEntity updateConnection(final Revision revision, final ConnectionDTO connectionDTO) { - final Connection connectionNode = connectionDAO.getConnection(connectionDTO.getId()); - - final RevisionUpdate snapshot = updateComponent( - revision, - connectionNode, - () -> connectionDAO.updateConnection(connectionDTO), - connection -> dtoFactory.createConnectionDto(connection)); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connectionNode); - final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionNode.getIdentifier())); - return entityFactory.createConnectionEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status); - } - - @Override - public ProcessorEntity updateProcessor(final Revision revision, final ProcessorDTO processorDTO) { - // get the component, ensure we have access to it, and perform the update request - final ProcessorNode processorNode = processorDAO.getProcessor(processorDTO.getId()); - final RevisionUpdate snapshot = updateComponent(revision, - processorNode, - () -> processorDAO.updateProcessor(processorDTO), - proc -> { - awaitValidationCompletion(proc); - return dtoFactory.createProcessorDto(proc); - }); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processorNode); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processorNode)); - final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processorNode.getIdentifier())); - final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processorNode.getIdentifier())); - final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); - } - - private void awaitValidationCompletion(final ComponentNode component) { - component.getValidationStatus(VALIDATION_WAIT_MILLIS, TimeUnit.MILLISECONDS); - } - - @Override - public LabelEntity updateLabel(final Revision revision, final LabelDTO labelDTO) { - final Label labelNode = labelDAO.getLabel(labelDTO.getId()); - final RevisionUpdate snapshot = updateComponent(revision, - labelNode, - () -> labelDAO.updateLabel(labelDTO), - label -> dtoFactory.createLabelDto(label)); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(labelNode); - return entityFactory.createLabelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); - } - - @Override - public FunnelEntity updateFunnel(final Revision revision, final FunnelDTO funnelDTO) { - final Funnel funnelNode = funnelDAO.getFunnel(funnelDTO.getId()); - final RevisionUpdate snapshot = updateComponent(revision, - funnelNode, - () -> funnelDAO.updateFunnel(funnelDTO), - funnel -> dtoFactory.createFunnelDto(funnel)); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnelNode); - return entityFactory.createFunnelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); - } - - - /** - * Updates a component with the given revision, using the provided supplier to call - * into the appropriate DAO and the provided function to convert the component into a DTO. - * - * @param revision the current revision - * @param daoUpdate a Supplier that will update the component via the appropriate DAO - * @param dtoCreation a Function to convert a component into a dao - * @param the DTO Type of the updated component - * @param the Component Type of the updated component - * @return A RevisionUpdate that represents the new configuration - */ - private RevisionUpdate updateComponent(final Revision revision, final Authorizable authorizable, final Supplier daoUpdate, final Function dtoCreation) { - try { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - - final RevisionUpdate updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(revision), user, new UpdateRevisionTask() { - @Override - public RevisionUpdate update() { - // get the updated component - final C component = daoUpdate.get(); - - // save updated controller - controllerFacade.save(); - - final D dto = dtoCreation.apply(component); - - final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId()); - final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity()); - return new StandardRevisionUpdate<>(dto, lastModification); - } - }); - - return updatedComponent; - } catch (final ExpiredRevisionClaimException erce) { - throw new InvalidRevisionException("Failed to update component " + authorizable, erce); - } - } - - - @Override - public void verifyUpdateSnippet(final SnippetDTO snippetDto, final Set affectedComponentIds) { - // if snippet does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (snippetDAO.hasSnippet(snippetDto.getId())) { - snippetDAO.verifyUpdateSnippetComponent(snippetDto); - } - } - - @Override - public SnippetEntity updateSnippet(final Set revisions, final SnippetDTO snippetDto) { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final RevisionClaim revisionClaim = new StandardRevisionClaim(revisions); - - final RevisionUpdate snapshot; - try { - snapshot = revisionManager.updateRevision(revisionClaim, user, new UpdateRevisionTask() { - @Override - public RevisionUpdate update() { - // get the updated component - final Snippet snippet = snippetDAO.updateSnippetComponents(snippetDto); - - // drop the snippet - snippetDAO.dropSnippet(snippet.getId()); - - // save updated controller - controllerFacade.save(); - - // increment the revisions - final Set updatedRevisions = revisions.stream().map(revision -> { - final Revision currentRevision = revisionManager.getRevision(revision.getComponentId()); - return currentRevision.incrementRevision(revision.getClientId()); - }).collect(Collectors.toSet()); - - final SnippetDTO dto = dtoFactory.createSnippetDto(snippet); - return new StandardRevisionUpdate<>(dto, null, updatedRevisions); - } - }); - } catch (final ExpiredRevisionClaimException e) { - throw new InvalidRevisionException("Failed to update Snippet", e); - } - - return entityFactory.createSnippetEntity(snapshot.getComponent()); - } - - @Override - public PortEntity updateInputPort(final Revision revision, final PortDTO inputPortDTO) { - final Port inputPortNode = inputPortDAO.getPort(inputPortDTO.getId()); - final RevisionUpdate snapshot = updateComponent(revision, - inputPortNode, - () -> inputPortDAO.updatePort(inputPortDTO), - port -> dtoFactory.createPortDto(port)); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(inputPortNode); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(inputPortNode)); - final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(inputPortNode.getIdentifier())); - final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(inputPortNode.getIdentifier())); - final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); - } - - @Override - public PortEntity updateOutputPort(final Revision revision, final PortDTO outputPortDTO) { - final Port outputPortNode = outputPortDAO.getPort(outputPortDTO.getId()); - final RevisionUpdate snapshot = updateComponent(revision, - outputPortNode, - () -> outputPortDAO.updatePort(outputPortDTO), - port -> dtoFactory.createPortDto(port)); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(outputPortNode); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(outputPortNode), NiFiUserUtils.getNiFiUser()); - final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(outputPortNode.getIdentifier())); - final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(outputPortNode.getIdentifier())); - final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); - } - - @Override - public RemoteProcessGroupEntity updateRemoteProcessGroup(final Revision revision, final RemoteProcessGroupDTO remoteProcessGroupDTO) { - final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId()); - final RevisionUpdate snapshot = updateComponent( - revision, - remoteProcessGroupNode, - () -> remoteProcessGroupDAO.updateRemoteProcessGroup(remoteProcessGroupDTO), - remoteProcessGroup -> dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup)); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode)); - final RevisionDTO updateRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); - final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroupNode, - controllerFacade.getRemoteProcessGroupStatus(remoteProcessGroupNode.getIdentifier())); - final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(remoteProcessGroupNode.getIdentifier())); - final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), updateRevision, permissions, operatePermissions, status, bulletinEntities); - } - - @Override - public RemoteProcessGroupPortEntity updateRemoteProcessGroupInputPort( - final Revision revision, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { - - final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupPortDTO.getGroupId()); - final RevisionUpdate snapshot = updateComponent( - revision, - remoteProcessGroupNode, - () -> remoteProcessGroupDAO.updateRemoteProcessGroupInputPort(remoteProcessGroupId, remoteProcessGroupPortDTO), - remoteGroupPort -> dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort)); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode)); - final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); - return entityFactory.createRemoteProcessGroupPortEntity(snapshot.getComponent(), updatedRevision, permissions, operatePermissions); - } - - @Override - public RemoteProcessGroupPortEntity updateRemoteProcessGroupOutputPort( - final Revision revision, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { - - final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupPortDTO.getGroupId()); - final RevisionUpdate snapshot = updateComponent( - revision, - remoteProcessGroupNode, - () -> remoteProcessGroupDAO.updateRemoteProcessGroupOutputPort(remoteProcessGroupId, remoteProcessGroupPortDTO), - remoteGroupPort -> dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort)); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode)); - final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); - return entityFactory.createRemoteProcessGroupPortEntity(snapshot.getComponent(), updatedRevision, permissions, operatePermissions); - } - - @Override - public Set getActiveComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO variableRegistryDto) { - final ProcessGroup group = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId()); - if (group == null) { - throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistryDto.getProcessGroupId()); - } - - final Map variableMap = new HashMap<>(); - variableRegistryDto.getVariables().stream() // have to use forEach here instead of using Collectors.toMap because value may be null - .map(VariableEntity::getVariable) - .forEach(var -> variableMap.put(var.getName(), var.getValue())); - - final Set affectedComponentDtos = new HashSet<>(); - - final Set updatedVariableNames = getUpdatedVariables(group, variableMap); - for (final String variableName : updatedVariableNames) { - final Set affectedComponents = group.getComponentsAffectedByVariable(variableName); - - for (final ComponentNode component : affectedComponents) { - if (component instanceof ProcessorNode) { - final ProcessorNode procNode = (ProcessorNode) component; - if (procNode.isRunning()) { - affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(procNode)); - } - } else if (component instanceof ControllerServiceNode) { - final ControllerServiceNode serviceNode = (ControllerServiceNode) component; - if (serviceNode.isActive()) { - affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(serviceNode)); - } - } else { - throw new RuntimeException("Found unexpected type of Component [" + component.getCanonicalClassName() + "] dependending on variable"); - } - } - } - - return affectedComponentDtos; - } - - @Override - public Set getComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO variableRegistryDto) { - final ProcessGroup group = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId()); - if (group == null) { - throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistryDto.getProcessGroupId()); - } - - final Map variableMap = new HashMap<>(); - variableRegistryDto.getVariables().stream() // have to use forEach here instead of using Collectors.toMap because value may be null - .map(VariableEntity::getVariable) - .forEach(var -> variableMap.put(var.getName(), var.getValue())); - - final Set affectedComponentEntities = new HashSet<>(); - - final Set updatedVariableNames = getUpdatedVariables(group, variableMap); - for (final String variableName : updatedVariableNames) { - final Set affectedComponents = group.getComponentsAffectedByVariable(variableName); - affectedComponentEntities.addAll(dtoFactory.createAffectedComponentEntities(affectedComponents, revisionManager)); - } - - return affectedComponentEntities; - } - - private Set getUpdatedVariables(final ProcessGroup group, final Map newVariableValues) { - final Set updatedVariableNames = new HashSet<>(); - - final ComponentVariableRegistry registry = group.getVariableRegistry(); - for (final Map.Entry entry : newVariableValues.entrySet()) { - final String varName = entry.getKey(); - final String newValue = entry.getValue(); - - final String curValue = registry.getVariableValue(varName); - if (!Objects.equals(newValue, curValue)) { - updatedVariableNames.add(varName); - } - } - - return updatedVariableNames; - } - - - @Override - public VariableRegistryEntity updateVariableRegistry(Revision revision, VariableRegistryDTO variableRegistryDto) { - final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId()); - final RevisionUpdate snapshot = updateComponent(revision, - processGroupNode, - () -> processGroupDAO.updateVariableRegistry(variableRegistryDto), - processGroup -> dtoFactory.createVariableRegistryDto(processGroup, revisionManager)); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode); - final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); - return entityFactory.createVariableRegistryEntity(snapshot.getComponent(), updatedRevision, permissions); - } - - - @Override - public ProcessGroupEntity updateProcessGroup(final Revision revision, final ProcessGroupDTO processGroupDTO) { - final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(processGroupDTO.getId()); - final RevisionUpdate snapshot = updateComponent(revision, - processGroupNode, - () -> processGroupDAO.updateProcessGroup(processGroupDTO), - processGroup -> dtoFactory.createProcessGroupDto(processGroup)); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode); - final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); - final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroupNode.getIdentifier())); - final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroupNode.getIdentifier())); - final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createProcessGroupEntity(snapshot.getComponent(), updatedRevision, permissions, status, bulletinEntities); - } - - @Override - public void verifyUpdateProcessGroup(ProcessGroupDTO processGroupDTO) { - if (processGroupDAO.hasProcessGroup(processGroupDTO.getId())) { - processGroupDAO.verifyUpdate(processGroupDTO); - } - } - - @Override - public ScheduleComponentsEntity enableComponents(String processGroupId, ScheduledState state, Map componentRevisions) { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - - final RevisionUpdate updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new - UpdateRevisionTask() { - @Override - public RevisionUpdate update() { - // schedule the components - processGroupDAO.enableComponents(processGroupId, state, componentRevisions.keySet()); - - // update the revisions - final Map updatedRevisions = new HashMap<>(); - for (final Revision revision : componentRevisions.values()) { - final Revision currentRevision = revisionManager.getRevision(revision.getComponentId()); - updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId())); - } - - // save - controllerFacade.save(); - - // gather details for response - final ScheduleComponentsEntity entity = new ScheduleComponentsEntity(); - entity.setId(processGroupId); - entity.setState(state.name()); - return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values())); - } - }); - - return updatedComponent.getComponent(); - } - - @Override - public ScheduleComponentsEntity scheduleComponents(final String processGroupId, final ScheduledState state, final Map componentRevisions) { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final RevisionUpdate updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new - UpdateRevisionTask() { - @Override - public RevisionUpdate update() { - // schedule the components - processGroupDAO.scheduleComponents(processGroupId, state, componentRevisions.keySet()); - - // update the revisions - final Map updatedRevisions = new HashMap<>(); - for (final Revision revision : componentRevisions.values()) { - final Revision currentRevision = revisionManager.getRevision(revision.getComponentId()); - updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId())); - } - - // save - controllerFacade.save(); - - // gather details for response - final ScheduleComponentsEntity entity = new ScheduleComponentsEntity(); - entity.setId(processGroupId); - entity.setState(state.name()); - return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values())); - } - }); - - return updatedComponent.getComponent(); - } - - @Override - public ActivateControllerServicesEntity activateControllerServices(final String processGroupId, final ControllerServiceState state, final Map serviceRevisions) { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final RevisionUpdate updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(serviceRevisions.values()), user, - new UpdateRevisionTask() { - @Override - public RevisionUpdate update() { - // schedule the components - processGroupDAO.activateControllerServices(processGroupId, state, serviceRevisions.keySet()); - - // update the revisions - final Map updatedRevisions = new HashMap<>(); - for (final Revision revision : serviceRevisions.values()) { - final Revision currentRevision = revisionManager.getRevision(revision.getComponentId()); - updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId())); - } - - // save - controllerFacade.save(); - - // gather details for response - final ActivateControllerServicesEntity entity = new ActivateControllerServicesEntity(); - entity.setId(processGroupId); - entity.setState(state.name()); - return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values())); - } - }); - - return updatedComponent.getComponent(); - } - - - @Override - public ControllerConfigurationEntity updateControllerConfiguration(final Revision revision, final ControllerConfigurationDTO controllerConfigurationDTO) { - final RevisionUpdate updatedComponent = updateComponent( - revision, - controllerFacade, - () -> { - if (controllerConfigurationDTO.getMaxTimerDrivenThreadCount() != null) { - controllerFacade.setMaxTimerDrivenThreadCount(controllerConfigurationDTO.getMaxTimerDrivenThreadCount()); - } - if (controllerConfigurationDTO.getMaxEventDrivenThreadCount() != null) { - controllerFacade.setMaxEventDrivenThreadCount(controllerConfigurationDTO.getMaxEventDrivenThreadCount()); - } - - return controllerConfigurationDTO; - }, - controller -> dtoFactory.createControllerConfigurationDto(controllerFacade)); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerFacade); - final RevisionDTO updateRevision = dtoFactory.createRevisionDTO(updatedComponent.getLastModification()); - return entityFactory.createControllerConfigurationEntity(updatedComponent.getComponent(), updateRevision, permissions); - } - - - @Override - public NodeDTO updateNode(final NodeDTO nodeDTO) { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - if (user == null) { - throw new WebApplicationException(new Throwable("Unable to access details for current user.")); - } - final String userDn = user.getIdentity(); - - final NodeIdentifier nodeId = clusterCoordinator.getNodeIdentifier(nodeDTO.getNodeId()); - if (nodeId == null) { - throw new UnknownNodeException("No node exists with ID " + nodeDTO.getNodeId()); - } - - - if (NodeConnectionState.CONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) { - clusterCoordinator.requestNodeConnect(nodeId, userDn); - } else if (NodeConnectionState.OFFLOADING.name().equalsIgnoreCase(nodeDTO.getStatus())) { - clusterCoordinator.requestNodeOffload(nodeId, OffloadCode.OFFLOADED, - "User " + userDn + " requested that node be offloaded"); - } else if (NodeConnectionState.DISCONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) { - clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.USER_DISCONNECTED, - "User " + userDn + " requested that node be disconnected from cluster"); - } - - return getNode(nodeId); - } - - @Override - public CounterDTO updateCounter(final String counterId) { - return dtoFactory.createCounterDto(controllerFacade.resetCounter(counterId)); - } - - @Override - public void verifyCanClearProcessorState(final String processorId) { - processorDAO.verifyClearState(processorId); - } - - @Override - public void clearProcessorState(final String processorId) { - processorDAO.clearState(processorId); - } - - @Override - public void verifyCanClearControllerServiceState(final String controllerServiceId) { - controllerServiceDAO.verifyClearState(controllerServiceId); - } - - @Override - public void clearControllerServiceState(final String controllerServiceId) { - controllerServiceDAO.clearState(controllerServiceId); - } - - @Override - public void verifyCanClearReportingTaskState(final String reportingTaskId) { - reportingTaskDAO.verifyClearState(reportingTaskId); - } - - @Override - public void clearReportingTaskState(final String reportingTaskId) { - reportingTaskDAO.clearState(reportingTaskId); - } - - @Override - public ConnectionEntity deleteConnection(final Revision revision, final String connectionId) { - final Connection connection = connectionDAO.getConnection(connectionId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection); - final ConnectionDTO snapshot = deleteComponent( - revision, - connection.getResource(), - () -> connectionDAO.deleteConnection(connectionId), - false, // no policies to remove - dtoFactory.createConnectionDto(connection)); - - return entityFactory.createConnectionEntity(snapshot, null, permissions, null); - } - - @Override - public DropRequestDTO deleteFlowFileDropRequest(final String connectionId, final String dropRequestId) { - return dtoFactory.createDropRequestDTO(connectionDAO.deleteFlowFileDropRequest(connectionId, dropRequestId)); - } - - @Override - public ListingRequestDTO deleteFlowFileListingRequest(final String connectionId, final String listingRequestId) { - final Connection connection = connectionDAO.getConnection(connectionId); - final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.deleteFlowFileListingRequest(connectionId, listingRequestId)); - - // include whether the source and destination are running - if (connection.getSource() != null) { - listRequest.setSourceRunning(connection.getSource().isRunning()); - } - if (connection.getDestination() != null) { - listRequest.setDestinationRunning(connection.getDestination().isRunning()); - } - - return listRequest; - } - - @Override - public ProcessorEntity deleteProcessor(final Revision revision, final String processorId) { - final ProcessorNode processor = processorDAO.getProcessor(processorId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor)); - final ProcessorDTO snapshot = deleteComponent( - revision, - processor.getResource(), - () -> processorDAO.deleteProcessor(processorId), - true, - dtoFactory.createProcessorDto(processor)); - - return entityFactory.createProcessorEntity(snapshot, null, permissions, operatePermissions, null, null); - } - - @Override - public ProcessorEntity terminateProcessor(final String processorId) { - processorDAO.terminate(processorId); - return getProcessor(processorId); - } - - @Override - public void verifyTerminateProcessor(final String processorId) { - processorDAO.verifyTerminate(processorId); - } - - @Override - public LabelEntity deleteLabel(final Revision revision, final String labelId) { - final Label label = labelDAO.getLabel(labelId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label); - final LabelDTO snapshot = deleteComponent( - revision, - label.getResource(), - () -> labelDAO.deleteLabel(labelId), - true, - dtoFactory.createLabelDto(label)); - - return entityFactory.createLabelEntity(snapshot, null, permissions); - } - - @Override - public UserEntity deleteUser(final Revision revision, final String userId) { - final User user = userDAO.getUser(userId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); - final Set userGroups = user != null ? userGroupDAO.getUserGroupsForUser(userId).stream() - .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()) : null; - final Set policyEntities = user != null ? userGroupDAO.getAccessPoliciesForUser(userId).stream() - .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()) : null; - - final String resourceIdentifier = ResourceFactory.getTenantResource().getIdentifier() + "/" + userId; - final UserDTO snapshot = deleteComponent( - revision, - new Resource() { - @Override - public String getIdentifier() { - return resourceIdentifier; - } - - @Override - public String getName() { - return resourceIdentifier; - } - - @Override - public String getSafeDescription() { - return "User " + userId; - } - }, - () -> userDAO.deleteUser(userId), - false, // no user specific policies to remove - dtoFactory.createUserDto(user, userGroups, policyEntities)); - - return entityFactory.createUserEntity(snapshot, null, permissions); - } - - @Override - public UserGroupEntity deleteUserGroup(final Revision revision, final String userGroupId) { - final Group userGroup = userGroupDAO.getUserGroup(userGroupId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); - final Set users = userGroup != null ? userGroup.getUsers().stream() - .map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()) : null; - final Set policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(userGroup.getIdentifier()).stream() - .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); - - final String resourceIdentifier = ResourceFactory.getTenantResource().getIdentifier() + "/" + userGroupId; - final UserGroupDTO snapshot = deleteComponent( - revision, - new Resource() { - @Override - public String getIdentifier() { - return resourceIdentifier; - } - - @Override - public String getName() { - return resourceIdentifier; - } - - @Override - public String getSafeDescription() { - return "User Group " + userGroupId; - } - }, - () -> userGroupDAO.deleteUserGroup(userGroupId), - false, // no user group specific policies to remove - dtoFactory.createUserGroupDto(userGroup, users, policyEntities)); - - return entityFactory.createUserGroupEntity(snapshot, null, permissions); - } - - @Override - public AccessPolicyEntity deleteAccessPolicy(final Revision revision, final String accessPolicyId) { - final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(accessPolicyId); - final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource()); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicyId)); - final Set userGroups = accessPolicy != null ? accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()) : null; - final Set users = accessPolicy != null ? accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()) : null; - final AccessPolicyDTO snapshot = deleteComponent( - revision, - new Resource() { - @Override - public String getIdentifier() { - return accessPolicy.getResource(); - } - - @Override - public String getName() { - return accessPolicy.getResource(); - } - - @Override - public String getSafeDescription() { - return "Policy " + accessPolicyId; - } - }, - () -> accessPolicyDAO.deleteAccessPolicy(accessPolicyId), - false, // no need to clean up any policies as it's already been removed above - dtoFactory.createAccessPolicyDto(accessPolicy, userGroups, users, componentReference)); - - return entityFactory.createAccessPolicyEntity(snapshot, null, permissions); - } - - @Override - public FunnelEntity deleteFunnel(final Revision revision, final String funnelId) { - final Funnel funnel = funnelDAO.getFunnel(funnelId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnel); - final FunnelDTO snapshot = deleteComponent( - revision, - funnel.getResource(), - () -> funnelDAO.deleteFunnel(funnelId), - true, - dtoFactory.createFunnelDto(funnel)); - - return entityFactory.createFunnelEntity(snapshot, null, permissions); - } - - /** - * Deletes a component using the Optimistic Locking Manager - * - * @param revision the current revision - * @param resource the resource being removed - * @param deleteAction the action that deletes the component via the appropriate DAO object - * @param cleanUpPolicies whether or not the policies for this resource should be removed as well - not necessary when there are - * no component specific policies or if the policies of the component are inherited - * @return a dto that represents the new configuration - */ - private D deleteComponent(final Revision revision, final Resource resource, final Runnable deleteAction, final boolean cleanUpPolicies, final D dto) { - final RevisionClaim claim = new StandardRevisionClaim(revision); - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - - return revisionManager.deleteRevision(claim, user, new DeleteRevisionTask() { - @Override - public D performTask() { - logger.debug("Attempting to delete component {} with claim {}", resource.getIdentifier(), claim); - - // run the delete action - deleteAction.run(); - - // save the flow - controllerFacade.save(); - logger.debug("Deletion of component {} was successful", resource.getIdentifier()); - - if (cleanUpPolicies) { - cleanUpPolicies(resource); - } - - return dto; - } - }); - } - - /** - * Clean up the policies for the specified component resource. - * - * @param componentResource the resource for the component - */ - private void cleanUpPolicies(final Resource componentResource) { - // ensure the authorizer supports configuration - if (accessPolicyDAO.supportsConfigurableAuthorizer()) { - final List resources = new ArrayList<>(); - resources.add(componentResource); - resources.add(ResourceFactory.getDataResource(componentResource)); - resources.add(ResourceFactory.getProvenanceDataResource(componentResource)); - resources.add(ResourceFactory.getDataTransferResource(componentResource)); - resources.add(ResourceFactory.getPolicyResource(componentResource)); - - for (final Resource resource : resources) { - for (final RequestAction action : RequestAction.values()) { - try { - // since the component is being deleted, also delete any relevant access policies - final AccessPolicy readPolicy = accessPolicyDAO.getAccessPolicy(action, resource.getIdentifier()); - if (readPolicy != null) { - accessPolicyDAO.deleteAccessPolicy(readPolicy.getIdentifier()); - } - } catch (final Exception e) { - logger.warn(String.format("Unable to remove access policy for %s %s after component removal.", action, resource.getIdentifier()), e); - } - } - } - } - } - - @Override - public void verifyDeleteSnippet(final String snippetId, final Set affectedComponentIds) { - snippetDAO.verifyDeleteSnippetComponents(snippetId); - } - - @Override - public SnippetEntity deleteSnippet(final Set revisions, final String snippetId) { - final Snippet snippet = snippetDAO.getSnippet(snippetId); - - // grab the resources in the snippet so we can delete the policies afterwards - final Set snippetResources = new HashSet<>(); - snippet.getProcessors().keySet().forEach(id -> snippetResources.add(processorDAO.getProcessor(id).getResource())); - snippet.getInputPorts().keySet().forEach(id -> snippetResources.add(inputPortDAO.getPort(id).getResource())); - snippet.getOutputPorts().keySet().forEach(id -> snippetResources.add(outputPortDAO.getPort(id).getResource())); - snippet.getFunnels().keySet().forEach(id -> snippetResources.add(funnelDAO.getFunnel(id).getResource())); - snippet.getLabels().keySet().forEach(id -> snippetResources.add(labelDAO.getLabel(id).getResource())); - snippet.getRemoteProcessGroups().keySet().forEach(id -> snippetResources.add(remoteProcessGroupDAO.getRemoteProcessGroup(id).getResource())); - snippet.getProcessGroups().keySet().forEach(id -> { - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(id); - - // add the process group - snippetResources.add(processGroup.getResource()); - - // add each encapsulated component - processGroup.findAllProcessors().forEach(processor -> snippetResources.add(processor.getResource())); - processGroup.findAllInputPorts().forEach(inputPort -> snippetResources.add(inputPort.getResource())); - processGroup.findAllOutputPorts().forEach(outputPort -> snippetResources.add(outputPort.getResource())); - processGroup.findAllFunnels().forEach(funnel -> snippetResources.add(funnel.getResource())); - processGroup.findAllLabels().forEach(label -> snippetResources.add(label.getResource())); - processGroup.findAllProcessGroups().forEach(childGroup -> snippetResources.add(childGroup.getResource())); - processGroup.findAllRemoteProcessGroups().forEach(remoteProcessGroup -> snippetResources.add(remoteProcessGroup.getResource())); - processGroup.findAllTemplates().forEach(template -> snippetResources.add(template.getResource())); - processGroup.findAllControllerServices().forEach(controllerService -> snippetResources.add(controllerService.getResource())); - }); - - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final RevisionClaim claim = new StandardRevisionClaim(revisions); - final SnippetDTO dto = revisionManager.deleteRevision(claim, user, new DeleteRevisionTask() { - @Override - public SnippetDTO performTask() { - // delete the components in the snippet - snippetDAO.deleteSnippetComponents(snippetId); - - // drop the snippet - snippetDAO.dropSnippet(snippetId); - - // save - controllerFacade.save(); - - // create the dto for the snippet that was just removed - return dtoFactory.createSnippetDto(snippet); - } - }); - - // clean up component policies - snippetResources.forEach(resource -> cleanUpPolicies(resource)); - - return entityFactory.createSnippetEntity(dto); - } - - @Override - public PortEntity deleteInputPort(final Revision revision, final String inputPortId) { - final Port port = inputPortDAO.getPort(inputPortId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port)); - final PortDTO snapshot = deleteComponent( - revision, - port.getResource(), - () -> inputPortDAO.deletePort(inputPortId), - true, - dtoFactory.createPortDto(port)); - - return entityFactory.createPortEntity(snapshot, null, permissions, operatePermissions, null, null); - } - - @Override - public PortEntity deleteOutputPort(final Revision revision, final String outputPortId) { - final Port port = outputPortDAO.getPort(outputPortId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port)); - final PortDTO snapshot = deleteComponent( - revision, - port.getResource(), - () -> outputPortDAO.deletePort(outputPortId), - true, - dtoFactory.createPortDto(port)); - - return entityFactory.createPortEntity(snapshot, null, permissions, operatePermissions, null, null); - } - - @Override - public ProcessGroupEntity deleteProcessGroup(final Revision revision, final String groupId) { - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); - - // grab the resources in the snippet so we can delete the policies afterwards - final Set groupResources = new HashSet<>(); - processGroup.findAllProcessors().forEach(processor -> groupResources.add(processor.getResource())); - processGroup.findAllInputPorts().forEach(inputPort -> groupResources.add(inputPort.getResource())); - processGroup.findAllOutputPorts().forEach(outputPort -> groupResources.add(outputPort.getResource())); - processGroup.findAllFunnels().forEach(funnel -> groupResources.add(funnel.getResource())); - processGroup.findAllLabels().forEach(label -> groupResources.add(label.getResource())); - processGroup.findAllProcessGroups().forEach(childGroup -> groupResources.add(childGroup.getResource())); - processGroup.findAllRemoteProcessGroups().forEach(remoteProcessGroup -> groupResources.add(remoteProcessGroup.getResource())); - processGroup.findAllTemplates().forEach(template -> groupResources.add(template.getResource())); - processGroup.findAllControllerServices().forEach(controllerService -> groupResources.add(controllerService.getResource())); - - final ProcessGroupDTO snapshot = deleteComponent( - revision, - processGroup.getResource(), - () -> processGroupDAO.deleteProcessGroup(groupId), - true, - dtoFactory.createProcessGroupDto(processGroup)); - - // delete all applicable component policies - groupResources.forEach(groupResource -> cleanUpPolicies(groupResource)); - - return entityFactory.createProcessGroupEntity(snapshot, null, permissions, null, null); - } - - @Override - public RemoteProcessGroupEntity deleteRemoteProcessGroup(final Revision revision, final String remoteProcessGroupId) { - final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroup)); - final RemoteProcessGroupDTO snapshot = deleteComponent( - revision, - remoteProcessGroup.getResource(), - () -> remoteProcessGroupDAO.deleteRemoteProcessGroup(remoteProcessGroupId), - true, - dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup)); - - return entityFactory.createRemoteProcessGroupEntity(snapshot, null, permissions, operatePermissions, null, null); - } - - @Override - public void deleteTemplate(final String id) { - // delete the template and save the flow - templateDAO.deleteTemplate(id); - controllerFacade.save(); - } - - @Override - public ConnectionEntity createConnection(final Revision revision, final String groupId, final ConnectionDTO connectionDTO) { - final RevisionUpdate snapshot = createComponent( - revision, - connectionDTO, - () -> connectionDAO.createConnection(groupId, connectionDTO), - connection -> dtoFactory.createConnectionDto(connection)); - - final Connection connection = connectionDAO.getConnection(connectionDTO.getId()); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection); - final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionDTO.getId())); - return entityFactory.createConnectionEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status); - } - - @Override - public DropRequestDTO createFlowFileDropRequest(final String connectionId, final String dropRequestId) { - return dtoFactory.createDropRequestDTO(connectionDAO.createFlowFileDropRequest(connectionId, dropRequestId)); - } - - @Override - public ListingRequestDTO createFlowFileListingRequest(final String connectionId, final String listingRequestId) { - final Connection connection = connectionDAO.getConnection(connectionId); - final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.createFlowFileListingRequest(connectionId, listingRequestId)); - - // include whether the source and destination are running - if (connection.getSource() != null) { - listRequest.setSourceRunning(connection.getSource().isRunning()); - } - if (connection.getDestination() != null) { - listRequest.setDestinationRunning(connection.getDestination().isRunning()); - } - - return listRequest; - } - - @Override - public ProcessorEntity createProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) { - final RevisionUpdate snapshot = createComponent( - revision, - processorDTO, - () -> processorDAO.createProcessor(groupId, processorDTO), - processor -> { - awaitValidationCompletion(processor); - return dtoFactory.createProcessorDto(processor); - }); - - final ProcessorNode processor = processorDAO.getProcessor(processorDTO.getId()); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor)); - final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processorDTO.getId())); - final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processorDTO.getId())); - final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); - } - - @Override - public LabelEntity createLabel(final Revision revision, final String groupId, final LabelDTO labelDTO) { - final RevisionUpdate snapshot = createComponent( - revision, - labelDTO, - () -> labelDAO.createLabel(groupId, labelDTO), - label -> dtoFactory.createLabelDto(label)); - - final Label label = labelDAO.getLabel(labelDTO.getId()); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label); - return entityFactory.createLabelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); - } - - /** - * Creates a component using the optimistic locking manager. - * - * @param componentDto the DTO that will be used to create the component - * @param daoCreation A Supplier that will create the NiFi Component to use - * @param dtoCreation a Function that will convert the NiFi Component into a corresponding DTO - * @param the DTO Type - * @param the NiFi Component Type - * @return a RevisionUpdate that represents the updated configuration - */ - private RevisionUpdate createComponent(final Revision revision, final ComponentDTO componentDto, final Supplier daoCreation, final Function dtoCreation) { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - - // read lock on the containing group - // request claim for component to be created... revision already verified (version == 0) - final RevisionClaim claim = new StandardRevisionClaim(revision); - - // update revision through revision manager - return revisionManager.updateRevision(claim, user, () -> { - // add the component - final C component = daoCreation.get(); - - // save the flow - controllerFacade.save(); - - final D dto = dtoCreation.apply(component); - final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); - return new StandardRevisionUpdate<>(dto, lastMod); - }); - } - - @Override - public BulletinEntity createBulletin(final BulletinDTO bulletinDTO, final Boolean canRead){ - final Bulletin bulletin = BulletinFactory.createBulletin(bulletinDTO.getCategory(),bulletinDTO.getLevel(),bulletinDTO.getMessage()); - bulletinRepository.addBulletin(bulletin); - return entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin),canRead); - } - - @Override - public FunnelEntity createFunnel(final Revision revision, final String groupId, final FunnelDTO funnelDTO) { - final RevisionUpdate snapshot = createComponent( - revision, - funnelDTO, - () -> funnelDAO.createFunnel(groupId, funnelDTO), - funnel -> dtoFactory.createFunnelDto(funnel)); - - final Funnel funnel = funnelDAO.getFunnel(funnelDTO.getId()); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnel); - return entityFactory.createFunnelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); - } - - @Override - public AccessPolicyEntity createAccessPolicy(final Revision revision, final AccessPolicyDTO accessPolicyDTO) { - final Authorizable tenantAuthorizable = authorizableLookup.getTenant(); - final String creator = NiFiUserUtils.getNiFiUserIdentity(); - - final AccessPolicy newAccessPolicy = accessPolicyDAO.createAccessPolicy(accessPolicyDTO); - final ComponentReferenceEntity componentReference = createComponentReferenceEntity(newAccessPolicy.getResource()); - final AccessPolicyDTO newAccessPolicyDto = dtoFactory.createAccessPolicyDto(newAccessPolicy, - newAccessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()), - newAccessPolicy.getUsers().stream().map(userId -> { - final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userId)); - return entityFactory.createTenantEntity(dtoFactory.createTenantDTO(userDAO.getUser(userId)), userRevision, - dtoFactory.createPermissionsDto(tenantAuthorizable)); - }).collect(Collectors.toSet()), componentReference); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicyDTO.getId())); - return entityFactory.createAccessPolicyEntity(newAccessPolicyDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions); - } - - @Override - public UserEntity createUser(final Revision revision, final UserDTO userDTO) { - final String creator = NiFiUserUtils.getNiFiUserIdentity(); - final User newUser = userDAO.createUser(userDTO); - final Set tenantEntities = userGroupDAO.getUserGroupsForUser(newUser.getIdentifier()).stream() - .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()); - final Set policyEntities = userGroupDAO.getAccessPoliciesForUser(newUser.getIdentifier()).stream() - .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); - final UserDTO newUserDto = dtoFactory.createUserDto(newUser, tenantEntities, policyEntities); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); - return entityFactory.createUserEntity(newUserDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions); - } - - private ComponentReferenceEntity createComponentReferenceEntity(final String resource) { - ComponentReferenceEntity componentReferenceEntity = null; - try { - // get the component authorizable - Authorizable componentAuthorizable = authorizableLookup.getAuthorizableFromResource(resource); - - // if this represents an authorizable whose policy permissions are enforced through the base resource, - // get the underlying base authorizable for the component reference - if (componentAuthorizable instanceof EnforcePolicyPermissionsThroughBaseResource) { - componentAuthorizable = ((EnforcePolicyPermissionsThroughBaseResource) componentAuthorizable).getBaseAuthorizable(); - } - - final ComponentReferenceDTO componentReference = dtoFactory.createComponentReferenceDto(componentAuthorizable); - if (componentReference != null) { - final PermissionsDTO componentReferencePermissions = dtoFactory.createPermissionsDto(componentAuthorizable); - final RevisionDTO componentReferenceRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(componentReference.getId())); - componentReferenceEntity = entityFactory.createComponentReferenceEntity(componentReference, componentReferenceRevision, componentReferencePermissions); - } - } catch (final ResourceNotFoundException e) { - // component not found for the specified resource - } - - return componentReferenceEntity; - } - - private AccessPolicySummaryEntity createAccessPolicySummaryEntity(final AccessPolicy ap) { - final ComponentReferenceEntity componentReference = createComponentReferenceEntity(ap.getResource()); - final AccessPolicySummaryDTO apSummary = dtoFactory.createAccessPolicySummaryDto(ap, componentReference); - final PermissionsDTO apPermissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(ap.getIdentifier())); - final RevisionDTO apRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(ap.getIdentifier())); - return entityFactory.createAccessPolicySummaryEntity(apSummary, apRevision, apPermissions); - } - - @Override - public UserGroupEntity createUserGroup(final Revision revision, final UserGroupDTO userGroupDTO) { - final String creator = NiFiUserUtils.getNiFiUserIdentity(); - final Group newUserGroup = userGroupDAO.createUserGroup(userGroupDTO); - final Set tenantEntities = newUserGroup.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()); - final Set policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(newUserGroup.getIdentifier()).stream() - .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); - final UserGroupDTO newUserGroupDto = dtoFactory.createUserGroupDto(newUserGroup, tenantEntities, policyEntities); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); - return entityFactory.createUserGroupEntity(newUserGroupDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions); - } - - private void validateSnippetContents(final FlowSnippetDTO flow) { - // validate any processors - if (flow.getProcessors() != null) { - for (final ProcessorDTO processorDTO : flow.getProcessors()) { - final ProcessorNode processorNode = processorDAO.getProcessor(processorDTO.getId()); - processorDTO.setValidationStatus(processorNode.getValidationStatus().name()); - - final Collection validationErrors = processorNode.getValidationErrors(); - if (validationErrors != null && !validationErrors.isEmpty()) { - final List errors = new ArrayList<>(); - for (final ValidationResult validationResult : validationErrors) { - errors.add(validationResult.toString()); - } - processorDTO.setValidationErrors(errors); - } - } - } - - if (flow.getInputPorts() != null) { - for (final PortDTO portDTO : flow.getInputPorts()) { - final Port port = inputPortDAO.getPort(portDTO.getId()); - final Collection validationErrors = port.getValidationErrors(); - if (validationErrors != null && !validationErrors.isEmpty()) { - final List errors = new ArrayList<>(); - for (final ValidationResult validationResult : validationErrors) { - errors.add(validationResult.toString()); - } - portDTO.setValidationErrors(errors); - } - } - } - - if (flow.getOutputPorts() != null) { - for (final PortDTO portDTO : flow.getOutputPorts()) { - final Port port = outputPortDAO.getPort(portDTO.getId()); - final Collection validationErrors = port.getValidationErrors(); - if (validationErrors != null && !validationErrors.isEmpty()) { - final List errors = new ArrayList<>(); - for (final ValidationResult validationResult : validationErrors) { - errors.add(validationResult.toString()); - } - portDTO.setValidationErrors(errors); - } - } - } - - // get any remote process group issues - if (flow.getRemoteProcessGroups() != null) { - for (final RemoteProcessGroupDTO remoteProcessGroupDTO : flow.getRemoteProcessGroups()) { - final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId()); - - if (remoteProcessGroup.getAuthorizationIssue() != null) { - remoteProcessGroupDTO.setAuthorizationIssues(Arrays.asList(remoteProcessGroup.getAuthorizationIssue())); - } - } - } - } - - @Override - public FlowEntity copySnippet(final String groupId, final String snippetId, final Double originX, final Double originY, final String idGenerationSeed) { - // create the new snippet - final FlowSnippetDTO snippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY, idGenerationSeed); - - // save the flow - controllerFacade.save(); - - // drop the snippet - snippetDAO.dropSnippet(snippetId); - - // post process new flow snippet - final FlowDTO flowDto = postProcessNewFlowSnippet(groupId, snippet); - - final FlowEntity flowEntity = new FlowEntity(); - flowEntity.setFlow(flowDto); - return flowEntity; - } - - @Override - public SnippetEntity createSnippet(final SnippetDTO snippetDTO) { - // add the component - final Snippet snippet = snippetDAO.createSnippet(snippetDTO); - - // save the flow - controllerFacade.save(); - - final SnippetDTO dto = dtoFactory.createSnippetDto(snippet); - final RevisionUpdate snapshot = new StandardRevisionUpdate<>(dto, null); - - return entityFactory.createSnippetEntity(snapshot.getComponent()); - } - - @Override - public PortEntity createInputPort(final Revision revision, final String groupId, final PortDTO inputPortDTO) { - final RevisionUpdate snapshot = createComponent( - revision, - inputPortDTO, - () -> inputPortDAO.createPort(groupId, inputPortDTO), - port -> dtoFactory.createPortDto(port)); - - final Port port = inputPortDAO.getPort(inputPortDTO.getId()); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port)); - final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(port.getIdentifier())); - final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier())); - final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); - } - - @Override - public PortEntity createOutputPort(final Revision revision, final String groupId, final PortDTO outputPortDTO) { - final RevisionUpdate snapshot = createComponent( - revision, - outputPortDTO, - () -> outputPortDAO.createPort(groupId, outputPortDTO), - port -> dtoFactory.createPortDto(port)); - - final Port port = outputPortDAO.getPort(outputPortDTO.getId()); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port)); - final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(port.getIdentifier())); - final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier())); - final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); - } - - @Override - public ProcessGroupEntity createProcessGroup(final Revision revision, final String parentGroupId, final ProcessGroupDTO processGroupDTO) { - final RevisionUpdate snapshot = createComponent( - revision, - processGroupDTO, - () -> processGroupDAO.createProcessGroup(parentGroupId, processGroupDTO), - processGroup -> dtoFactory.createProcessGroupDto(processGroup)); - - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupDTO.getId()); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); - final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroup.getIdentifier())); - final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroup.getIdentifier())); - final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status, bulletinEntities); - } - - @Override - public RemoteProcessGroupEntity createRemoteProcessGroup(final Revision revision, final String groupId, final RemoteProcessGroupDTO remoteProcessGroupDTO) { - final RevisionUpdate snapshot = createComponent( - revision, - remoteProcessGroupDTO, - () -> remoteProcessGroupDAO.createRemoteProcessGroup(groupId, remoteProcessGroupDTO), - remoteProcessGroup -> dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup)); - - final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId()); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroup)); - final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroup, controllerFacade.getRemoteProcessGroupStatus(remoteProcessGroup.getIdentifier())); - final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(remoteProcessGroup.getIdentifier())); - final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), - permissions, operatePermissions, status, bulletinEntities); - } - - @Override - public boolean isRemoteGroupPortConnected(final String remoteProcessGroupId, final String remotePortId) { - final RemoteProcessGroup rpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId); - RemoteGroupPort port = rpg.getInputPort(remotePortId); - if (port != null) { - return port.hasIncomingConnection(); - } - - port = rpg.getOutputPort(remotePortId); - if (port != null) { - return !port.getConnections().isEmpty(); - } - - throw new ResourceNotFoundException("Could not find Port with ID " + remotePortId + " as a child of RemoteProcessGroup with ID " + remoteProcessGroupId); - } - - @Override - public void verifyCanAddTemplate(String groupId, String name) { - templateDAO.verifyCanAddTemplate(name, groupId); - } - - @Override - public void verifyComponentTypes(FlowSnippetDTO snippet) { - templateDAO.verifyComponentTypes(snippet); - } - - @Override - public void verifyComponentTypes(final VersionedProcessGroup versionedGroup) { - controllerFacade.verifyComponentTypes(versionedGroup); - } - - @Override - public void verifyImportProcessGroup(final VersionControlInformationDTO versionControlInfo, final VersionedProcessGroup contents, final String groupId) { - final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); - verifyImportProcessGroup(versionControlInfo, contents, group); - } - - private void verifyImportProcessGroup(final VersionControlInformationDTO vciDto, final VersionedProcessGroup contents, final ProcessGroup group) { - if (group == null) { - return; - } - - final VersionControlInformation vci = group.getVersionControlInformation(); - if (vci != null) { - // Note that we do not compare the Registry ID here because there could be two registry clients - // that point to the same server (one could point to localhost while another points to 127.0.0.1, for instance).. - if (Objects.equals(vciDto.getBucketId(), vci.getBucketIdentifier()) - && Objects.equals(vciDto.getFlowId(), vci.getFlowIdentifier())) { - - throw new IllegalStateException("Cannot import the specified Versioned Flow into the Process Group because doing so would cause a recursive dataflow. " - + "If Process Group A contains Process Group B, then Process Group B is not allowed to contain the flow identified by Process Group A."); - } - } - - final Set childGroups = contents.getProcessGroups(); - if (childGroups != null) { - for (final VersionedProcessGroup childGroup : childGroups) { - final VersionedFlowCoordinates childCoordinates = childGroup.getVersionedFlowCoordinates(); - if (childCoordinates != null) { - final VersionControlInformationDTO childVci = new VersionControlInformationDTO(); - childVci.setBucketId(childCoordinates.getBucketId()); - childVci.setFlowId(childCoordinates.getFlowId()); - verifyImportProcessGroup(childVci, childGroup, group); - } - } - } - - verifyImportProcessGroup(vciDto, contents, group.getParent()); - } - - @Override - public TemplateDTO createTemplate(final String name, final String description, final String snippetId, final String groupId, final Optional idGenerationSeed) { - // get the specified snippet - final Snippet snippet = snippetDAO.getSnippet(snippetId); - - // create the template - final TemplateDTO templateDTO = new TemplateDTO(); - templateDTO.setName(name); - templateDTO.setDescription(description); - templateDTO.setTimestamp(new Date()); - templateDTO.setSnippet(snippetUtils.populateFlowSnippet(snippet, true, true, true)); - templateDTO.setEncodingVersion(TemplateDTO.MAX_ENCODING_VERSION); - - // set the id based on the specified seed - final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString(); - templateDTO.setId(uuid); - - // create the template - final Template template = templateDAO.createTemplate(templateDTO, groupId); - - // drop the snippet - snippetDAO.dropSnippet(snippetId); - - // save the flow - controllerFacade.save(); - - return dtoFactory.createTemplateDTO(template); - } - - /** - * Ensures default values are populated for all components in this snippet. This is necessary to handle old templates without default values - * and when existing properties have default values introduced. - * - * @param snippet snippet - */ - private void ensureDefaultPropertyValuesArePopulated(final FlowSnippetDTO snippet) { - if (snippet != null) { - if (snippet.getControllerServices() != null) { - snippet.getControllerServices().forEach(dto -> { - if (dto.getProperties() == null) { - dto.setProperties(new LinkedHashMap<>()); - } - - try { - final ConfigurableComponent configurableComponent = controllerFacade.getTemporaryComponent(dto.getType(), dto.getBundle()); - configurableComponent.getPropertyDescriptors().forEach(descriptor -> { - if (dto.getProperties().get(descriptor.getName()) == null) { - dto.getProperties().put(descriptor.getName(), descriptor.getDefaultValue()); - } - }); - } catch (final Exception e) { - logger.warn(String.format("Unable to create ControllerService of type %s to populate default values.", dto.getType())); - } - }); - } - - if (snippet.getProcessors() != null) { - snippet.getProcessors().forEach(dto -> { - if (dto.getConfig() == null) { - dto.setConfig(new ProcessorConfigDTO()); - } - - final ProcessorConfigDTO config = dto.getConfig(); - if (config.getProperties() == null) { - config.setProperties(new LinkedHashMap<>()); - } - - try { - final ConfigurableComponent configurableComponent = controllerFacade.getTemporaryComponent(dto.getType(), dto.getBundle()); - configurableComponent.getPropertyDescriptors().forEach(descriptor -> { - if (config.getProperties().get(descriptor.getName()) == null) { - config.getProperties().put(descriptor.getName(), descriptor.getDefaultValue()); - } - }); - } catch (final Exception e) { - logger.warn(String.format("Unable to create Processor of type %s to populate default values.", dto.getType())); - } - }); - } - - if (snippet.getProcessGroups() != null) { - snippet.getProcessGroups().forEach(processGroup -> { - ensureDefaultPropertyValuesArePopulated(processGroup.getContents()); - }); - } - } - } - - @Override - public TemplateDTO importTemplate(final TemplateDTO templateDTO, final String groupId, final Optional idGenerationSeed) { - // ensure id is set - final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString(); - templateDTO.setId(uuid); - - // mark the timestamp - templateDTO.setTimestamp(new Date()); - - // ensure default values are populated - ensureDefaultPropertyValuesArePopulated(templateDTO.getSnippet()); - - // import the template - final Template template = templateDAO.importTemplate(templateDTO, groupId); - - // save the flow - controllerFacade.save(); - - // return the template dto - return dtoFactory.createTemplateDTO(template); - } - - /** - * Post processes a new flow snippet including validation, removing the snippet, and DTO conversion. - * - * @param groupId group id - * @param snippet snippet - * @return flow dto - */ - private FlowDTO postProcessNewFlowSnippet(final String groupId, final FlowSnippetDTO snippet) { - // validate the new snippet - validateSnippetContents(snippet); - - // identify all components added - final Set identifiers = new HashSet<>(); - snippet.getProcessors().stream() - .map(proc -> proc.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getConnections().stream() - .map(conn -> conn.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getInputPorts().stream() - .map(port -> port.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getOutputPorts().stream() - .map(port -> port.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getProcessGroups().stream() - .map(group -> group.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getRemoteProcessGroups().stream() - .map(remoteGroup -> remoteGroup.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getRemoteProcessGroups().stream() - .filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getInputPorts() != null) - .flatMap(remoteGroup -> remoteGroup.getContents().getInputPorts().stream()) - .map(remoteInputPort -> remoteInputPort.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getRemoteProcessGroups().stream() - .filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getOutputPorts() != null) - .flatMap(remoteGroup -> remoteGroup.getContents().getOutputPorts().stream()) - .map(remoteOutputPort -> remoteOutputPort.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getLabels().stream() - .map(label -> label.getId()) - .forEach(id -> identifiers.add(id)); - - final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); - final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId); - return dtoFactory.createFlowDto(group, groupStatus, snippet, revisionManager, this::getProcessGroupBulletins); - } - - @Override - public FlowEntity createTemplateInstance(final String groupId, final Double originX, final Double originY, final String templateEncodingVersion, - final FlowSnippetDTO requestSnippet, final String idGenerationSeed) { - - // instantiate the template - there is no need to make another copy of the flow snippet since the actual template - // was copied and this dto is only used to instantiate it's components (which as already completed) - final FlowSnippetDTO snippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateEncodingVersion, requestSnippet, idGenerationSeed); - - // save the flow - controllerFacade.save(); - - // post process the new flow snippet - final FlowDTO flowDto = postProcessNewFlowSnippet(groupId, snippet); - - final FlowEntity flowEntity = new FlowEntity(); - flowEntity.setFlow(flowDto); - return flowEntity; - } - - @Override - public ControllerServiceEntity createControllerService(final Revision revision, final String groupId, final ControllerServiceDTO controllerServiceDTO) { - controllerServiceDTO.setParentGroupId(groupId); - - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - - // request claim for component to be created... revision already verified (version == 0) - final RevisionClaim claim = new StandardRevisionClaim(revision); - - final RevisionUpdate snapshot; - if (groupId == null) { - // update revision through revision manager - snapshot = revisionManager.updateRevision(claim, user, () -> { - // Unfortunately, we can not use the createComponent() method here because createComponent() wants to obtain the read lock - // on the group. The Controller Service may or may not have a Process Group (it won't if it's controller-scoped). - final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO); - controllerFacade.save(); - - awaitValidationCompletion(controllerService); - final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(controllerService); - - final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); - return new StandardRevisionUpdate<>(dto, lastMod); - }); - } else { - snapshot = revisionManager.updateRevision(claim, user, () -> { - final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO); - controllerFacade.save(); - - awaitValidationCompletion(controllerService); - final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(controllerService); - - final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); - return new StandardRevisionUpdate<>(dto, lastMod); - }); - } - - final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceDTO.getId()); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService)); - final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(controllerServiceDTO.getId())); - final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities); - } - - @Override - public ControllerServiceEntity updateControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) { - // get the component, ensure we have access to it, and perform the update request - final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceDTO.getId()); - final RevisionUpdate snapshot = updateComponent(revision, - controllerService, - () -> controllerServiceDAO.updateControllerService(controllerServiceDTO), - cs -> { - awaitValidationCompletion(cs); - final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(cs); - final ControllerServiceReference ref = controllerService.getReferences(); - final ControllerServiceReferencingComponentsEntity referencingComponentsEntity = - createControllerServiceReferencingComponentsEntity(ref, Sets.newHashSet(controllerService.getIdentifier())); - dto.setReferencingComponents(referencingComponentsEntity.getControllerServiceReferencingComponents()); - return dto; - }); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService)); - final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(controllerServiceDTO.getId())); - final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities); - } - - - @Override - public ControllerServiceReferencingComponentsEntity updateControllerServiceReferencingComponents( - final Map referenceRevisions, final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) { - - final RevisionClaim claim = new StandardRevisionClaim(referenceRevisions.values()); - - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final RevisionUpdate update = revisionManager.updateRevision(claim, user, - new UpdateRevisionTask() { - @Override - public RevisionUpdate update() { - final Set updated = controllerServiceDAO.updateControllerServiceReferencingComponents(controllerServiceId, scheduledState, controllerServiceState); - final ControllerServiceReference updatedReference = controllerServiceDAO.getControllerService(controllerServiceId).getReferences(); - - // get the revisions of the updated components - final Map updatedRevisions = new HashMap<>(); - for (final ComponentNode component : updated) { - final Revision currentRevision = revisionManager.getRevision(component.getIdentifier()); - final Revision requestRevision = referenceRevisions.get(component.getIdentifier()); - updatedRevisions.put(component.getIdentifier(), currentRevision.incrementRevision(requestRevision.getClientId())); - } - - // ensure the revision for all referencing components is included regardless of whether they were updated in this request - for (final ComponentNode component : updatedReference.findRecursiveReferences(ComponentNode.class)) { - updatedRevisions.putIfAbsent(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier())); - } - - final ControllerServiceReferencingComponentsEntity entity = createControllerServiceReferencingComponentsEntity(updatedReference, updatedRevisions); - return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values())); - } - }); - - return update.getComponent(); - } - - /** - * Finds the identifiers for all components referencing a ControllerService. - * - * @param reference ControllerServiceReference - * @param visited ControllerServices we've already visited - */ - private void findControllerServiceReferencingComponentIdentifiers(final ControllerServiceReference reference, final Set visited) { - for (final ComponentNode component : reference.getReferencingComponents()) { - - // if this is a ControllerService consider it's referencing components - if (component instanceof ControllerServiceNode) { - final ControllerServiceNode node = (ControllerServiceNode) component; - if (!visited.contains(node)) { - visited.add(node); - findControllerServiceReferencingComponentIdentifiers(node.getReferences(), visited); - } - } - } - } - - /** - * Creates entities for components referencing a ControllerService using their current revision. - * - * @param reference ControllerServiceReference - * @return The entity - */ - private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity(final ControllerServiceReference reference, final Set lockedIds) { - final Set visited = new HashSet<>(); - visited.add(reference.getReferencedComponent()); - findControllerServiceReferencingComponentIdentifiers(reference, visited); - - final Map referencingRevisions = new HashMap<>(); - for (final ComponentNode component : reference.getReferencingComponents()) { - referencingRevisions.put(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier())); - } - - return createControllerServiceReferencingComponentsEntity(reference, referencingRevisions); - } - - /** - * Creates entities for components referencing a ControllerService using the specified revisions. - * - * @param reference ControllerServiceReference - * @param revisions The revisions - * @return The entity - */ - private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity( - final ControllerServiceReference reference, final Map revisions) { - final Set visited = new HashSet<>(); - visited.add(reference.getReferencedComponent()); - return createControllerServiceReferencingComponentsEntity(reference, revisions, visited); - } - - /** - * Creates entities for components referencing a ControllerServcie using the specified revisions. - * - * @param reference ControllerServiceReference - * @param revisions The revisions - * @param visited Which services we've already considered (in case of cycle) - * @return The entity - */ - private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity( - final ControllerServiceReference reference, final Map revisions, final Set visited) { - - final String modifier = NiFiUserUtils.getNiFiUserIdentity(); - final Set referencingComponents = reference.getReferencingComponents(); - - final Set componentEntities = new HashSet<>(); - for (final ComponentNode refComponent : referencingComponents) { - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(refComponent); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(refComponent)); - - final Revision revision = revisions.get(refComponent.getIdentifier()); - final FlowModification flowMod = new FlowModification(revision, modifier); - final RevisionDTO revisionDto = dtoFactory.createRevisionDTO(flowMod); - final ControllerServiceReferencingComponentDTO dto = dtoFactory.createControllerServiceReferencingComponentDTO(refComponent); - - if (refComponent instanceof ControllerServiceNode) { - final ControllerServiceNode node = (ControllerServiceNode) refComponent; - - // indicate if we've hit a cycle - dto.setReferenceCycle(visited.contains(node)); - - // mark node as visited before building the reference cycle - visited.add(node); - - // if we haven't encountered this service before include it's referencing components - if (!dto.getReferenceCycle()) { - final ControllerServiceReference refReferences = node.getReferences(); - final Map referencingRevisions = new HashMap<>(revisions); - for (final ComponentNode component : refReferences.getReferencingComponents()) { - referencingRevisions.putIfAbsent(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier())); - } - final ControllerServiceReferencingComponentsEntity references = createControllerServiceReferencingComponentsEntity(refReferences, referencingRevisions, visited); - dto.setReferencingComponents(references.getControllerServiceReferencingComponents()); - } - } - - componentEntities.add(entityFactory.createControllerServiceReferencingComponentEntity(refComponent.getIdentifier(), dto, revisionDto, permissions, operatePermissions)); - } - - final ControllerServiceReferencingComponentsEntity entity = new ControllerServiceReferencingComponentsEntity(); - entity.setControllerServiceReferencingComponents(componentEntities); - return entity; - } - - @Override - public ControllerServiceEntity deleteControllerService(final Revision revision, final String controllerServiceId) { - final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService)); - final ControllerServiceDTO snapshot = deleteComponent( - revision, - controllerService.getResource(), - () -> controllerServiceDAO.deleteControllerService(controllerServiceId), - true, - dtoFactory.createControllerServiceDto(controllerService)); - - return entityFactory.createControllerServiceEntity(snapshot, null, permissions, operatePermissions, null); - } - - - @Override - public RegistryClientEntity createRegistryClient(Revision revision, RegistryDTO registryDTO) { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - - // request claim for component to be created... revision already verified (version == 0) - final RevisionClaim claim = new StandardRevisionClaim(revision); - - // update revision through revision manager - final RevisionUpdate revisionUpdate = revisionManager.updateRevision(claim, user, () -> { - // add the component - final FlowRegistry registry = registryDAO.createFlowRegistry(registryDTO); - - // save the flow - controllerFacade.save(); - - final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); - return new StandardRevisionUpdate<>(registry, lastMod); - }); - - final FlowRegistry registry = revisionUpdate.getComponent(); - return createRegistryClientEntity(registry); - } - - @Override - public RegistryClientEntity getRegistryClient(final String registryId) { - final FlowRegistry registry = registryDAO.getFlowRegistry(registryId); - return createRegistryClientEntity(registry); - } - - private RegistryClientEntity createRegistryClientEntity(final FlowRegistry flowRegistry) { - if (flowRegistry == null) { - return null; - } - - final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(flowRegistry.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getController()); - final RegistryDTO dto = dtoFactory.createRegistryDto(flowRegistry); - - return entityFactory.createRegistryClientEntity(dto, revision, permissions); - } - - private VersionedFlowEntity createVersionedFlowEntity(final String registryId, final VersionedFlow versionedFlow) { - if (versionedFlow == null) { - return null; - } - - final VersionedFlowDTO dto = new VersionedFlowDTO(); - dto.setRegistryId(registryId); - dto.setBucketId(versionedFlow.getBucketIdentifier()); - dto.setFlowId(versionedFlow.getIdentifier()); - dto.setFlowName(versionedFlow.getName()); - dto.setDescription(versionedFlow.getDescription()); - - final VersionedFlowEntity entity = new VersionedFlowEntity(); - entity.setVersionedFlow(dto); - - return entity; - } - - private VersionedFlowSnapshotMetadataEntity createVersionedFlowSnapshotMetadataEntity(final String registryId, final VersionedFlowSnapshotMetadata metadata) { - if (metadata == null) { - return null; - } - - final VersionedFlowSnapshotMetadataEntity entity = new VersionedFlowSnapshotMetadataEntity(); - entity.setRegistryId(registryId); - entity.setVersionedFlowMetadata(metadata); - - return entity; - } - - @Override - public Set getRegistryClients() { - return registryDAO.getFlowRegistries().stream() - .map(this::createRegistryClientEntity) - .collect(Collectors.toSet()); - } - - @Override - public Set getRegistriesForUser(final NiFiUser user) { - return registryDAO.getFlowRegistriesForUser(user).stream() - .map(flowRegistry -> entityFactory.createRegistryEntity(dtoFactory.createRegistryDto(flowRegistry))) - .collect(Collectors.toSet()); - } - - @Override - public Set getBucketsForUser(final String registryId, final NiFiUser user) { - return registryDAO.getBucketsForUser(registryId, user).stream() - .map(bucket -> { - if (bucket == null) { - return null; - } - - final BucketDTO dto = new BucketDTO(); - dto.setId(bucket.getIdentifier()); - dto.setName(bucket.getName()); - dto.setDescription(bucket.getDescription()); - dto.setCreated(bucket.getCreatedTimestamp()); - - final Permissions regPermissions = bucket.getPermissions(); - final PermissionsDTO permissions = new PermissionsDTO(); - permissions.setCanRead(regPermissions.getCanRead()); - permissions.setCanWrite(regPermissions.getCanWrite()); - - return entityFactory.createBucketEntity(dto, permissions); - }) - .collect(Collectors.toSet()); - } - - @Override - public Set getFlowsForUser(String registryId, String bucketId, NiFiUser user) { - return registryDAO.getFlowsForUser(registryId, bucketId, user).stream() - .map(vf -> createVersionedFlowEntity(registryId, vf)) - .collect(Collectors.toSet()); - } - - @Override - public Set getFlowVersionsForUser(String registryId, String bucketId, String flowId, NiFiUser user) { - return registryDAO.getFlowVersionsForUser(registryId, bucketId, flowId, user).stream() - .map(md -> createVersionedFlowSnapshotMetadataEntity(registryId, md)) - .collect(Collectors.toSet()); - } - - @Override - public RegistryClientEntity updateRegistryClient(Revision revision, RegistryDTO registryDTO) { - final RevisionClaim revisionClaim = new StandardRevisionClaim(revision); - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - - final FlowRegistry registry = registryDAO.getFlowRegistry(registryDTO.getId()); - final RevisionUpdate revisionUpdate = revisionManager.updateRevision(revisionClaim, user, () -> { - final boolean duplicateName = registryDAO.getFlowRegistries().stream() - .anyMatch(reg -> reg.getName().equals(registryDTO.getName()) && !reg.getIdentifier().equals(registryDTO.getId())); - - if (duplicateName) { - throw new IllegalStateException("Cannot update Flow Registry because a Flow Registry already exists with the name " + registryDTO.getName()); - } - - registry.setDescription(registryDTO.getDescription()); - registry.setName(registryDTO.getName()); - registry.setURL(registryDTO.getUri()); - - controllerFacade.save(); - - final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId()); - final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity()); - - return new StandardRevisionUpdate<>(registry, lastModification); - }); - - final FlowRegistry updatedReg = revisionUpdate.getComponent(); - return createRegistryClientEntity(updatedReg); - } - - @Override - public void verifyDeleteRegistry(String registryId) { - processGroupDAO.verifyDeleteFlowRegistry(registryId); - } - - @Override - public RegistryClientEntity deleteRegistryClient(final Revision revision, final String registryId) { - final RevisionClaim claim = new StandardRevisionClaim(revision); - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - - final FlowRegistry registry = revisionManager.deleteRevision(claim, user, () -> { - final FlowRegistry reg = registryDAO.removeFlowRegistry(registryId); - controllerFacade.save(); - return reg; - }); - - return createRegistryClientEntity(registry); - } - - @Override - public ReportingTaskEntity createReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - - // request claim for component to be created... revision already verified (version == 0) - final RevisionClaim claim = new StandardRevisionClaim(revision); - - // update revision through revision manager - final RevisionUpdate snapshot = revisionManager.updateRevision(claim, user, () -> { - // create the reporting task - final ReportingTaskNode reportingTask = reportingTaskDAO.createReportingTask(reportingTaskDTO); - - // save the update - controllerFacade.save(); - awaitValidationCompletion(reportingTask); - - final ReportingTaskDTO dto = dtoFactory.createReportingTaskDto(reportingTask); - final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); - return new StandardRevisionUpdate<>(dto, lastMod); - }); - - final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskDTO.getId()); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask)); - final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier())); - final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createReportingTaskEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities); - } - - @Override - public ReportingTaskEntity updateReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) { - // get the component, ensure we have access to it, and perform the update request - final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskDTO.getId()); - final RevisionUpdate snapshot = updateComponent(revision, - reportingTask, - () -> reportingTaskDAO.updateReportingTask(reportingTaskDTO), - rt -> { - awaitValidationCompletion(rt); - return dtoFactory.createReportingTaskDto(rt); - }); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask)); - final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier())); - final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createReportingTaskEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities); - } - - @Override - public ReportingTaskEntity deleteReportingTask(final Revision revision, final String reportingTaskId) { - final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask)); - final ReportingTaskDTO snapshot = deleteComponent( - revision, - reportingTask.getResource(), - () -> reportingTaskDAO.deleteReportingTask(reportingTaskId), - true, - dtoFactory.createReportingTaskDto(reportingTask)); - - return entityFactory.createReportingTaskEntity(snapshot, null, permissions, operatePermissions, null); - } - - @Override - public void deleteActions(final Date endDate) { - // get the user from the request - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - if (user == null) { - throw new WebApplicationException(new Throwable("Unable to access details for current user.")); - } - - // create the purge details - final FlowChangePurgeDetails details = new FlowChangePurgeDetails(); - details.setEndDate(endDate); - - // create a purge action to record that records are being removed - final FlowChangeAction purgeAction = new FlowChangeAction(); - purgeAction.setUserIdentity(user.getIdentity()); - purgeAction.setOperation(Operation.Purge); - purgeAction.setTimestamp(new Date()); - purgeAction.setSourceId("Flow Controller"); - purgeAction.setSourceName("History"); - purgeAction.setSourceType(Component.Controller); - purgeAction.setActionDetails(details); - - // purge corresponding actions - auditService.purgeActions(endDate, purgeAction); - } - - @Override - public ProvenanceDTO submitProvenance(final ProvenanceDTO query) { - return controllerFacade.submitProvenance(query); - } - - @Override - public void deleteProvenance(final String queryId) { - controllerFacade.deleteProvenanceQuery(queryId); - } - - @Override - public LineageDTO submitLineage(final LineageDTO lineage) { - return controllerFacade.submitLineage(lineage); - } - - @Override - public void deleteLineage(final String lineageId) { - controllerFacade.deleteLineage(lineageId); - } - - @Override - public ProvenanceEventDTO submitReplay(final Long eventId) { - return controllerFacade.submitReplay(eventId); - } - - // ----------------------------------------- - // Read Operations - // ----------------------------------------- - - @Override - public SearchResultsDTO searchController(final String query) { - return controllerFacade.search(query); - } - - @Override - public DownloadableContent getContent(final String connectionId, final String flowFileUuid, final String uri) { - return connectionDAO.getContent(connectionId, flowFileUuid, uri); - } - - @Override - public DownloadableContent getContent(final Long eventId, final String uri, final ContentDirection contentDirection) { - return controllerFacade.getContent(eventId, uri, contentDirection); - } - - @Override - public ProvenanceDTO getProvenance(final String queryId, final Boolean summarize, final Boolean incrementalResults) { - return controllerFacade.getProvenanceQuery(queryId, summarize, incrementalResults); - } - - @Override - public LineageDTO getLineage(final String lineageId) { - return controllerFacade.getLineage(lineageId); - } - - @Override - public ProvenanceOptionsDTO getProvenanceSearchOptions() { - return controllerFacade.getProvenanceSearchOptions(); - } - - @Override - public ProvenanceEventDTO getProvenanceEvent(final Long id) { - return controllerFacade.getProvenanceEvent(id); - } - - @Override - public ProcessGroupStatusEntity getProcessGroupStatus(final String groupId, final boolean recursive) { - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); - final ProcessGroupStatusDTO dto = dtoFactory.createProcessGroupStatusDto(processGroup, controllerFacade.getProcessGroupStatus(groupId)); - - // prune the response as necessary - if (!recursive) { - pruneChildGroups(dto.getAggregateSnapshot()); - if (dto.getNodeSnapshots() != null) { - for (final NodeProcessGroupStatusSnapshotDTO nodeSnapshot : dto.getNodeSnapshots()) { - pruneChildGroups(nodeSnapshot.getStatusSnapshot()); - } - } - } - - return entityFactory.createProcessGroupStatusEntity(dto, permissions); - } - - private void pruneChildGroups(final ProcessGroupStatusSnapshotDTO snapshot) { - for (final ProcessGroupStatusSnapshotEntity childProcessGroupStatusEntity : snapshot.getProcessGroupStatusSnapshots()) { - final ProcessGroupStatusSnapshotDTO childProcessGroupStatus = childProcessGroupStatusEntity.getProcessGroupStatusSnapshot(); - childProcessGroupStatus.setConnectionStatusSnapshots(null); - childProcessGroupStatus.setProcessGroupStatusSnapshots(null); - childProcessGroupStatus.setInputPortStatusSnapshots(null); - childProcessGroupStatus.setOutputPortStatusSnapshots(null); - childProcessGroupStatus.setProcessorStatusSnapshots(null); - childProcessGroupStatus.setRemoteProcessGroupStatusSnapshots(null); - } - } - - @Override - public ControllerStatusDTO getControllerStatus() { - return controllerFacade.getControllerStatus(); - } - - @Override - public ComponentStateDTO getProcessorState(final String processorId) { - final StateMap clusterState = isClustered() ? processorDAO.getState(processorId, Scope.CLUSTER) : null; - final StateMap localState = processorDAO.getState(processorId, Scope.LOCAL); - - // processor will be non null as it was already found when getting the state - final ProcessorNode processor = processorDAO.getProcessor(processorId); - return dtoFactory.createComponentStateDTO(processorId, processor.getProcessor().getClass(), localState, clusterState); - } - - @Override - public ComponentStateDTO getControllerServiceState(final String controllerServiceId) { - final StateMap clusterState = isClustered() ? controllerServiceDAO.getState(controllerServiceId, Scope.CLUSTER) : null; - final StateMap localState = controllerServiceDAO.getState(controllerServiceId, Scope.LOCAL); - - // controller service will be non null as it was already found when getting the state - final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId); - return dtoFactory.createComponentStateDTO(controllerServiceId, controllerService.getControllerServiceImplementation().getClass(), localState, clusterState); - } - - @Override - public ComponentStateDTO getReportingTaskState(final String reportingTaskId) { - final StateMap clusterState = isClustered() ? reportingTaskDAO.getState(reportingTaskId, Scope.CLUSTER) : null; - final StateMap localState = reportingTaskDAO.getState(reportingTaskId, Scope.LOCAL); - - // reporting task will be non null as it was already found when getting the state - final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId); - return dtoFactory.createComponentStateDTO(reportingTaskId, reportingTask.getReportingTask().getClass(), localState, clusterState); - } - - @Override - public CountersDTO getCounters() { - final List counters = controllerFacade.getCounters(); - final Set counterDTOs = new LinkedHashSet<>(counters.size()); - for (final Counter counter : counters) { - counterDTOs.add(dtoFactory.createCounterDto(counter)); - } - - final CountersSnapshotDTO snapshotDto = dtoFactory.createCountersDto(counterDTOs); - final CountersDTO countersDto = new CountersDTO(); - countersDto.setAggregateSnapshot(snapshotDto); - - return countersDto; - } - - private ConnectionEntity createConnectionEntity(final Connection connection) { - final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(connection.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection); - final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connection.getIdentifier())); - return entityFactory.createConnectionEntity(dtoFactory.createConnectionDto(connection), revision, permissions, status); - } - - @Override - public Set getConnections(final String groupId) { - final Set connections = connectionDAO.getConnections(groupId); - return connections.stream() - .map(connection -> createConnectionEntity(connection)) - .collect(Collectors.toSet()); - } - - @Override - public ConnectionEntity getConnection(final String connectionId) { - final Connection connection = connectionDAO.getConnection(connectionId); - return createConnectionEntity(connection); - } - - @Override - public DropRequestDTO getFlowFileDropRequest(final String connectionId, final String dropRequestId) { - return dtoFactory.createDropRequestDTO(connectionDAO.getFlowFileDropRequest(connectionId, dropRequestId)); - } - - @Override - public ListingRequestDTO getFlowFileListingRequest(final String connectionId, final String listingRequestId) { - final Connection connection = connectionDAO.getConnection(connectionId); - final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.getFlowFileListingRequest(connectionId, listingRequestId)); - - // include whether the source and destination are running - if (connection.getSource() != null) { - listRequest.setSourceRunning(connection.getSource().isRunning()); - } - if (connection.getDestination() != null) { - listRequest.setDestinationRunning(connection.getDestination().isRunning()); - } - - return listRequest; - } - - @Override - public FlowFileDTO getFlowFile(final String connectionId, final String flowFileUuid) { - return dtoFactory.createFlowFileDTO(connectionDAO.getFlowFile(connectionId, flowFileUuid)); - } - - @Override - public ConnectionStatusEntity getConnectionStatus(final String connectionId) { - final Connection connection = connectionDAO.getConnection(connectionId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection); - final ConnectionStatusDTO dto = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionId)); - return entityFactory.createConnectionStatusEntity(dto, permissions); - } - - @Override - public StatusHistoryEntity getConnectionStatusHistory(final String connectionId) { - final Connection connection = connectionDAO.getConnection(connectionId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection); - final StatusHistoryDTO dto = controllerFacade.getConnectionStatusHistory(connectionId); - return entityFactory.createStatusHistoryEntity(dto, permissions); - } - - private ProcessorEntity createProcessorEntity(final ProcessorNode processor, final NiFiUser user) { - final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processor.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor, user); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor)); - final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier())); - final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processor.getIdentifier())); - final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createProcessorEntity(dtoFactory.createProcessorDto(processor), revision, permissions, operatePermissions, status, bulletinEntities); - } - - @Override - public Set getProcessors(final String groupId, final boolean includeDescendants) { - final Set processors = processorDAO.getProcessors(groupId, includeDescendants); - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - return processors.stream() - .map(processor -> createProcessorEntity(processor, user)) - .collect(Collectors.toSet()); - } - - @Override - public TemplateDTO exportTemplate(final String id) { - final Template template = templateDAO.getTemplate(id); - final TemplateDTO templateDetails = template.getDetails(); - - final TemplateDTO templateDTO = dtoFactory.createTemplateDTO(template); - templateDTO.setSnippet(dtoFactory.copySnippetContents(templateDetails.getSnippet())); - return templateDTO; - } - - @Override - public TemplateDTO getTemplate(final String id) { - return dtoFactory.createTemplateDTO(templateDAO.getTemplate(id)); - } - - @Override - public Set getTemplates() { - return templateDAO.getTemplates().stream() - .map(template -> { - final TemplateDTO dto = dtoFactory.createTemplateDTO(template); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(template); - - final TemplateEntity entity = new TemplateEntity(); - entity.setId(dto.getId()); - entity.setPermissions(permissions); - entity.setTemplate(dto); - return entity; - }).collect(Collectors.toSet()); - } - - @Override - public Set getWorkQueuePrioritizerTypes() { - return controllerFacade.getFlowFileComparatorTypes(); - } - - @Override - public Set getProcessorTypes(final String bundleGroup, final String bundleArtifact, final String type) { - return controllerFacade.getFlowFileProcessorTypes(bundleGroup, bundleArtifact, type); - } - - @Override - public Set getControllerServiceTypes(final String serviceType, final String serviceBundleGroup, final String serviceBundleArtifact, final String serviceBundleVersion, - final String bundleGroup, final String bundleArtifact, final String type) { - return controllerFacade.getControllerServiceTypes(serviceType, serviceBundleGroup, serviceBundleArtifact, serviceBundleVersion, bundleGroup, bundleArtifact, type); - } - - @Override - public Set getReportingTaskTypes(final String bundleGroup, final String bundleArtifact, final String type) { - return controllerFacade.getReportingTaskTypes(bundleGroup, bundleArtifact, type); - } - - @Override - public ProcessorEntity getProcessor(final String id) { - final ProcessorNode processor = processorDAO.getProcessor(id); - return createProcessorEntity(processor, NiFiUserUtils.getNiFiUser()); - } - - @Override - public PropertyDescriptorDTO getProcessorPropertyDescriptor(final String id, final String property) { - final ProcessorNode processor = processorDAO.getProcessor(id); - PropertyDescriptor descriptor = processor.getPropertyDescriptor(property); - - // return an invalid descriptor if the processor doesn't support this property - if (descriptor == null) { - descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build(); - } - - return dtoFactory.createPropertyDescriptorDto(descriptor, processor.getProcessGroup().getIdentifier()); - } - - @Override - public ProcessorStatusEntity getProcessorStatus(final String id) { - final ProcessorNode processor = processorDAO.getProcessor(id); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor); - final ProcessorStatusDTO dto = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(id)); - return entityFactory.createProcessorStatusEntity(dto, permissions); - } - - @Override - public StatusHistoryEntity getProcessorStatusHistory(final String id) { - final ProcessorNode processor = processorDAO.getProcessor(id); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor); - final StatusHistoryDTO dto = controllerFacade.getProcessorStatusHistory(id); - return entityFactory.createStatusHistoryEntity(dto, permissions); - } - - private boolean authorizeBulletin(final Bulletin bulletin) { - final String sourceId = bulletin.getSourceId(); - final ComponentType type = bulletin.getSourceType(); - - final Authorizable authorizable; - try { - switch (type) { - case PROCESSOR: - authorizable = authorizableLookup.getProcessor(sourceId).getAuthorizable(); - break; - case REPORTING_TASK: - authorizable = authorizableLookup.getReportingTask(sourceId).getAuthorizable(); - break; - case CONTROLLER_SERVICE: - authorizable = authorizableLookup.getControllerService(sourceId).getAuthorizable(); - break; - case FLOW_CONTROLLER: - authorizable = controllerFacade; - break; - case INPUT_PORT: - authorizable = authorizableLookup.getInputPort(sourceId); - break; - case OUTPUT_PORT: - authorizable = authorizableLookup.getOutputPort(sourceId); - break; - case REMOTE_PROCESS_GROUP: - authorizable = authorizableLookup.getRemoteProcessGroup(sourceId); - break; - default: - throw new WebApplicationException(Response.serverError().entity("An unexpected type of component is the source of this bulletin.").build()); - } - } catch (final ResourceNotFoundException e) { - // if the underlying component is gone, disallow - return false; - } - - // perform the authorization - final AuthorizationResult result = authorizable.checkAuthorization(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); - return Result.Approved.equals(result.getResult()); - } - - @Override - public BulletinBoardDTO getBulletinBoard(final BulletinQueryDTO query) { - // build the query - final BulletinQuery.Builder queryBuilder = new BulletinQuery.Builder() - .groupIdMatches(query.getGroupId()) - .sourceIdMatches(query.getSourceId()) - .nameMatches(query.getName()) - .messageMatches(query.getMessage()) - .after(query.getAfter()) - .limit(query.getLimit()); - - // perform the query - final List results = bulletinRepository.findBulletins(queryBuilder.build()); - - // perform the query and generate the results - iterating in reverse order since we are - // getting the most recent results by ordering by timestamp desc above. this gets the - // exact results we want but in reverse order - final List bulletinEntities = new ArrayList<>(); - for (final ListIterator bulletinIter = results.listIterator(results.size()); bulletinIter.hasPrevious(); ) { - final Bulletin bulletin = bulletinIter.previous(); - bulletinEntities.add(entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), authorizeBulletin(bulletin))); - } - - // create the bulletin board - final BulletinBoardDTO bulletinBoard = new BulletinBoardDTO(); - bulletinBoard.setBulletins(bulletinEntities); - bulletinBoard.setGenerated(new Date()); - return bulletinBoard; - } - - @Override - public SystemDiagnosticsDTO getSystemDiagnostics() { - final SystemDiagnostics sysDiagnostics = controllerFacade.getSystemDiagnostics(); - return dtoFactory.createSystemDiagnosticsDto(sysDiagnostics); - } - - @Override - public List getResources() { - final List resources = controllerFacade.getResources(); - final List resourceDtos = new ArrayList<>(resources.size()); - for (final Resource resource : resources) { - resourceDtos.add(dtoFactory.createResourceDto(resource)); - } - return resourceDtos; - } - - @Override - public void discoverCompatibleBundles(VersionedProcessGroup versionedGroup) { - BundleUtils.discoverCompatibleBundles(controllerFacade.getExtensionManager(), versionedGroup); - } - - @Override - public BundleCoordinate getCompatibleBundle(String type, BundleDTO bundleDTO) { - return BundleUtils.getCompatibleBundle(controllerFacade.getExtensionManager(), type, bundleDTO); - } - - @Override - public ConfigurableComponent getTempComponent(String classType, BundleCoordinate bundleCoordinate) { - return controllerFacade.getExtensionManager().getTempComponent(classType, bundleCoordinate); - } - - /** - * Ensures the specified user has permission to access the specified port. This method does - * not utilize the DataTransferAuthorizable as that will enforce the entire chain is - * authorized for the transfer. This method is only invoked when obtaining the site to site - * details so the entire chain isn't necessary. - */ - private boolean isUserAuthorized(final NiFiUser user, final RootGroupPort port) { - final boolean isSiteToSiteSecure = Boolean.TRUE.equals(properties.isSiteToSiteSecure()); - - // if site to site is not secure, allow all users - if (!isSiteToSiteSecure) { - return true; - } - - final Map userContext; - if (user.getClientAddress() != null && !user.getClientAddress().trim().isEmpty()) { - userContext = new HashMap<>(); - userContext.put(UserContextKeys.CLIENT_ADDRESS.name(), user.getClientAddress()); - } else { - userContext = null; - } - - final AuthorizationRequest request = new AuthorizationRequest.Builder() - .resource(ResourceFactory.getDataTransferResource(port.getResource())) - .identity(user.getIdentity()) - .groups(user.getGroups()) - .anonymous(user.isAnonymous()) - .accessAttempt(false) - .action(RequestAction.WRITE) - .userContext(userContext) - .explanationSupplier(() -> "Unable to retrieve port details.") - .build(); - - final AuthorizationResult result = authorizer.authorize(request); - return Result.Approved.equals(result.getResult()); - } - - @Override - public ControllerDTO getSiteToSiteDetails() { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - if (user == null) { - throw new WebApplicationException(new Throwable("Unable to access details for current user.")); - } - - // serialize the input ports this NiFi has access to - final Set inputPortDtos = new LinkedHashSet<>(); - final Set inputPorts = controllerFacade.getInputPorts(); - for (final RootGroupPort inputPort : inputPorts) { - if (isUserAuthorized(user, inputPort)) { - final PortDTO dto = new PortDTO(); - dto.setId(inputPort.getIdentifier()); - dto.setName(inputPort.getName()); - dto.setComments(inputPort.getComments()); - dto.setState(inputPort.getScheduledState().toString()); - inputPortDtos.add(dto); - } - } - - // serialize the output ports this NiFi has access to - final Set outputPortDtos = new LinkedHashSet<>(); - for (final RootGroupPort outputPort : controllerFacade.getOutputPorts()) { - if (isUserAuthorized(user, outputPort)) { - final PortDTO dto = new PortDTO(); - dto.setId(outputPort.getIdentifier()); - dto.setName(outputPort.getName()); - dto.setComments(outputPort.getComments()); - dto.setState(outputPort.getScheduledState().toString()); - outputPortDtos.add(dto); - } - } - - // get the root group - final ProcessGroup rootGroup = processGroupDAO.getProcessGroup(controllerFacade.getRootGroupId()); - final ProcessGroupCounts counts = rootGroup.getCounts(); - - // create the controller dto - final ControllerDTO controllerDTO = new ControllerDTO(); - controllerDTO.setId(controllerFacade.getRootGroupId()); - controllerDTO.setInstanceId(controllerFacade.getInstanceId()); - controllerDTO.setName(controllerFacade.getName()); - controllerDTO.setComments(controllerFacade.getComments()); - controllerDTO.setInputPorts(inputPortDtos); - controllerDTO.setOutputPorts(outputPortDtos); - controllerDTO.setInputPortCount(inputPortDtos.size()); - controllerDTO.setOutputPortCount(outputPortDtos.size()); - controllerDTO.setRunningCount(counts.getRunningCount()); - controllerDTO.setStoppedCount(counts.getStoppedCount()); - controllerDTO.setInvalidCount(counts.getInvalidCount()); - controllerDTO.setDisabledCount(counts.getDisabledCount()); - - // determine the site to site configuration - controllerDTO.setRemoteSiteListeningPort(controllerFacade.getRemoteSiteListeningPort()); - controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getRemoteSiteListeningHttpPort()); - controllerDTO.setSiteToSiteSecure(controllerFacade.isRemoteSiteCommsSecure()); - - return controllerDTO; - } - - @Override - public ControllerConfigurationEntity getControllerConfiguration() { - final Revision rev = revisionManager.getRevision(FlowController.class.getSimpleName()); - final ControllerConfigurationDTO dto = dtoFactory.createControllerConfigurationDto(controllerFacade); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerFacade); - final RevisionDTO revision = dtoFactory.createRevisionDTO(rev); - return entityFactory.createControllerConfigurationEntity(dto, revision, permissions); - } - - @Override - public ControllerBulletinsEntity getControllerBulletins() { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final ControllerBulletinsEntity controllerBulletinsEntity = new ControllerBulletinsEntity(); - - final List controllerBulletinEntities = new ArrayList<>(); - - final Authorizable controllerAuthorizable = authorizableLookup.getController(); - final boolean authorized = controllerAuthorizable.isAuthorized(authorizer, RequestAction.READ, user); - final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController()); - controllerBulletinEntities.addAll(bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, authorized)).collect(Collectors.toList())); - - // get the controller service bulletins - final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build(); - final List allControllerServiceBulletins = bulletinRepository.findBulletins(controllerServiceQuery); - final List controllerServiceBulletinEntities = new ArrayList<>(); - for (final Bulletin bulletin : allControllerServiceBulletins) { - try { - final Authorizable controllerServiceAuthorizable = authorizableLookup.getControllerService(bulletin.getSourceId()).getAuthorizable(); - final boolean controllerServiceAuthorized = controllerServiceAuthorizable.isAuthorized(authorizer, RequestAction.READ, user); - - final BulletinEntity controllerServiceBulletin = entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), controllerServiceAuthorized); - controllerServiceBulletinEntities.add(controllerServiceBulletin); - controllerBulletinEntities.add(controllerServiceBulletin); - } catch (final ResourceNotFoundException e) { - // controller service missing.. skip - } - } - controllerBulletinsEntity.setControllerServiceBulletins(controllerServiceBulletinEntities); - - // get the reporting task bulletins - final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build(); - final List allReportingTaskBulletins = bulletinRepository.findBulletins(reportingTaskQuery); - final List reportingTaskBulletinEntities = new ArrayList<>(); - for (final Bulletin bulletin : allReportingTaskBulletins) { - try { - final Authorizable reportingTaskAuthorizable = authorizableLookup.getReportingTask(bulletin.getSourceId()).getAuthorizable(); - final boolean reportingTaskAuthorizableAuthorized = reportingTaskAuthorizable.isAuthorized(authorizer, RequestAction.READ, user); - - final BulletinEntity reportingTaskBulletin = entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), reportingTaskAuthorizableAuthorized); - reportingTaskBulletinEntities.add(reportingTaskBulletin); - controllerBulletinEntities.add(reportingTaskBulletin); - } catch (final ResourceNotFoundException e) { - // reporting task missing.. skip - } - } - controllerBulletinsEntity.setReportingTaskBulletins(reportingTaskBulletinEntities); - - controllerBulletinsEntity.setBulletins(pruneAndSortBulletins(controllerBulletinEntities, BulletinRepository.MAX_BULLETINS_FOR_CONTROLLER)); - return controllerBulletinsEntity; - } - - @Override - public FlowConfigurationEntity getFlowConfiguration() { - final FlowConfigurationDTO dto = dtoFactory.createFlowConfigurationDto(properties.getAutoRefreshInterval(), - properties.getDefaultBackPressureObjectThreshold(), properties.getDefaultBackPressureDataSizeThreshold(),properties.getDcaeDistributorApiHostname()); - final FlowConfigurationEntity entity = new FlowConfigurationEntity(); - entity.setFlowConfiguration(dto); - return entity; - } - - @Override - public AccessPolicyEntity getAccessPolicy(final String accessPolicyId) { - final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(accessPolicyId); - return createAccessPolicyEntity(accessPolicy); - } - - @Override - public AccessPolicyEntity getAccessPolicy(final RequestAction requestAction, final String resource) { - Authorizable authorizable; - try { - authorizable = authorizableLookup.getAuthorizableFromResource(resource); - } catch (final ResourceNotFoundException e) { - // unable to find the underlying authorizable... user authorized based on top level /policies... create - // an anonymous authorizable to attempt to locate an existing policy for this resource - authorizable = new Authorizable() { - @Override - public Authorizable getParentAuthorizable() { - return null; - } - - @Override - public Resource getResource() { - return new Resource() { - @Override - public String getIdentifier() { - return resource; - } - - @Override - public String getName() { - return resource; - } - - @Override - public String getSafeDescription() { - return "Policy " + resource; - } - }; - } - }; - } - - final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(requestAction, authorizable); - return createAccessPolicyEntity(accessPolicy); - } - - private AccessPolicyEntity createAccessPolicyEntity(final AccessPolicy accessPolicy) { - final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(accessPolicy.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicy.getIdentifier())); - final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource()); - return entityFactory.createAccessPolicyEntity( - dtoFactory.createAccessPolicyDto(accessPolicy, - accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()), - accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()), componentReference), - revision, permissions); - } - - @Override - public UserEntity getUser(final String userId) { - final User user = userDAO.getUser(userId); - return createUserEntity(user, true); - } - - @Override - public Set getUsers() { - final Set users = userDAO.getUsers(); - return users.stream() - .map(user -> createUserEntity(user, false)) - .collect(Collectors.toSet()); - } - - private UserEntity createUserEntity(final User user, final boolean enforceUserExistence) { - final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(user.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); - final Set userGroups = userGroupDAO.getUserGroupsForUser(user.getIdentifier()).stream() - .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(enforceUserExistence)).collect(Collectors.toSet()); - final Set policyEntities = userGroupDAO.getAccessPoliciesForUser(user.getIdentifier()).stream() - .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); - return entityFactory.createUserEntity(dtoFactory.createUserDto(user, userGroups, policyEntities), userRevision, permissions); - } - - private UserGroupEntity createUserGroupEntity(final Group userGroup, final boolean enforceGroupExistence) { - final RevisionDTO userGroupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userGroup.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); - final Set users = userGroup.getUsers().stream().map(mapUserIdToTenantEntity(enforceGroupExistence)).collect(Collectors.toSet()); - final Set policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(userGroup.getIdentifier()).stream() - .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); - return entityFactory.createUserGroupEntity(dtoFactory.createUserGroupDto(userGroup, users, policyEntities), userGroupRevision, permissions); - } - - @Override - public UserGroupEntity getUserGroup(final String userGroupId) { - final Group userGroup = userGroupDAO.getUserGroup(userGroupId); - return createUserGroupEntity(userGroup, true); - } - - @Override - public Set getUserGroups() { - final Set userGroups = userGroupDAO.getUserGroups(); - return userGroups.stream() - .map(userGroup -> createUserGroupEntity(userGroup, false)) - .collect(Collectors.toSet()); - } - - private LabelEntity createLabelEntity(final Label label) { - final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(label.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label); - return entityFactory.createLabelEntity(dtoFactory.createLabelDto(label), revision, permissions); - } - - @Override - public Set getLabels(final String groupId) { - final Set