diff options
Diffstat (limited to 'mod/designtool/designtool-web/src/main/java')
5 files changed, 76 insertions, 5617 deletions
diff --git a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/NiFi.java b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/NiFi.java deleted file mode 100644 index 0b033db..0000000 --- a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/NiFi.java +++ /dev/null @@ -1,446 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Modifications to the original nifi code for the ONAP project are made - * available under the Apache License, Version 2.0 - */ -package org.apache.nifi; - -import org.apache.nifi.bundle.Bundle; -import org.apache.nifi.nar.ExtensionMapping; -import org.apache.nifi.nar.NarClassLoaders; -import org.apache.nifi.nar.NarClassLoadersHolder; -import org.apache.nifi.nar.NarUnpacker; -import org.apache.nifi.nar.SystemBundle; -import org.apache.nifi.util.FileUtils; -import org.apache.nifi.util.NiFiProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.bridge.SLF4JBridgeHandler; - -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.lang.Thread.UncaughtExceptionHandler; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import java.util.Random; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -public class NiFi { - - private static final Logger LOGGER = LoggerFactory.getLogger(NiFi.class); - private static final String KEY_FILE_FLAG = "-K"; - private final NiFiServer nifiServer; - private final BootstrapListener bootstrapListener; - - public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port"; - private volatile boolean shutdown = false; - - public NiFi(final NiFiProperties properties) - throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { - - this(properties, ClassLoader.getSystemClassLoader()); - - } - - public NiFi(final NiFiProperties properties, ClassLoader rootClassLoader) - throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { - - // There can only be one krb5.conf for the overall Java process so set this globally during - // start up so that processors and our Kerberos authentication code don't have to set this - final File kerberosConfigFile = properties.getKerberosConfigurationFile(); - if (kerberosConfigFile != null) { - final String kerberosConfigFilePath = kerberosConfigFile.getAbsolutePath(); - LOGGER.info("Setting java.security.krb5.conf to {}", new Object[]{kerberosConfigFilePath}); - System.setProperty("java.security.krb5.conf", kerberosConfigFilePath); - } - - setDefaultUncaughtExceptionHandler(); - - // register the shutdown hook - addShutdownHook(); - - final String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY); - if (bootstrapPort != null) { - try { - final int port = Integer.parseInt(bootstrapPort); - - if (port < 1 || port > 65535) { - throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535"); - } - - bootstrapListener = new BootstrapListener(this, port); - bootstrapListener.start(); - } catch (final NumberFormatException nfe) { - throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535"); - } - } else { - LOGGER.info("NiFi started without Bootstrap Port information provided; will not listen for requests from Bootstrap"); - bootstrapListener = null; - } - - // delete the web working dir - if the application does not start successfully - // the web app directories might be in an invalid state. when this happens - // jetty will not attempt to re-extract the war into the directory. by removing - // the working directory, we can be assured that it will attempt to extract the - // war every time the application starts. - File webWorkingDir = properties.getWebWorkingDirectory(); - FileUtils.deleteFilesInDirectory(webWorkingDir, null, LOGGER, true, true); - FileUtils.deleteFile(webWorkingDir, LOGGER, 3); - - detectTimingIssues(); - - // redirect JUL log events - initLogging(); - - final Bundle systemBundle = SystemBundle.create(properties); - - // expand the nars - final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties, systemBundle); - - // load the extensions classloaders - NarClassLoaders narClassLoaders = NarClassLoadersHolder.getInstance(); - - narClassLoaders.init(rootClassLoader, - properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory()); - - // load the framework classloader - final ClassLoader frameworkClassLoader = narClassLoaders.getFrameworkBundle().getClassLoader(); - if (frameworkClassLoader == null) { - throw new IllegalStateException("Unable to find the framework NAR ClassLoader."); - } - - final Set<Bundle> narBundles = narClassLoaders.getBundles(); - - // load the server from the framework classloader - Thread.currentThread().setContextClassLoader(frameworkClassLoader); - Class<?> jettyServer = Class.forName("org.apache.nifi.web.server.JettyServer", true, frameworkClassLoader); - Constructor<?> jettyConstructor = jettyServer.getConstructor(NiFiProperties.class, Set.class); - - final long startTime = System.nanoTime(); - nifiServer = (NiFiServer) jettyConstructor.newInstance(properties, narBundles); - nifiServer.setExtensionMapping(extensionMapping); - nifiServer.setBundles(systemBundle, narBundles); - - if (shutdown) { - LOGGER.info("NiFi has been shutdown via NiFi Bootstrap. Will not start Controller"); - } else { - nifiServer.start(); - - if (bootstrapListener != null) { - bootstrapListener.sendStartedStatus(true); - } - - final long duration = System.nanoTime() - startTime; - LOGGER.info("Controller initialization took " + duration + " nanoseconds " - + "(" + (int) TimeUnit.SECONDS.convert(duration, TimeUnit.NANOSECONDS) + " seconds)."); - } - } - - protected void setDefaultUncaughtExceptionHandler() { - Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { - @Override - public void uncaughtException(final Thread t, final Throwable e) { - LOGGER.error("An Unknown Error Occurred in Thread {}: {}", t, e.toString()); - LOGGER.error("", e); - } - }); - } - - protected void addShutdownHook() { - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - // shutdown the jetty server - shutdownHook(); - } - })); - } - - protected void initLogging() { - SLF4JBridgeHandler.removeHandlersForRootLogger(); - SLF4JBridgeHandler.install(); - } - - private static ClassLoader createBootstrapClassLoader() { - //Get list of files in bootstrap folder - final List<URL> urls = new ArrayList<>(); - try { - Files.list(Paths.get("lib/bootstrap")).forEach(p -> { - try { - urls.add(p.toUri().toURL()); - } catch (final MalformedURLException mef) { - LOGGER.warn("Unable to load " + p.getFileName() + " due to " + mef, mef); - } - }); - } catch (IOException ioe) { - LOGGER.warn("Unable to access lib/bootstrap to create bootstrap classloader", ioe); - } - //Create the bootstrap classloader - return new URLClassLoader(urls.toArray(new URL[0]), Thread.currentThread().getContextClassLoader()); - } - - protected void shutdownHook() { - try { - shutdown(); - } catch (final Throwable t) { - LOGGER.warn("Problem occurred ensuring Jetty web server was properly terminated due to " + t); - } - } - - protected void shutdown() { - this.shutdown = true; - - LOGGER.info("Initiating shutdown of Jetty web server..."); - if (nifiServer != null) { - nifiServer.stop(); - } - if (bootstrapListener != null) { - bootstrapListener.stop(); - } - LOGGER.info("Jetty web server shutdown completed (nicely or otherwise)."); - } - - /** - * Determine if the machine we're running on has timing issues. - */ - private void detectTimingIssues() { - final int minRequiredOccurrences = 25; - final int maxOccurrencesOutOfRange = 15; - final AtomicLong lastTriggerMillis = new AtomicLong(System.currentTimeMillis()); - - final ScheduledExecutorService service = Executors.newScheduledThreadPool(1, new ThreadFactory() { - private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); - - @Override - public Thread newThread(final Runnable r) { - final Thread t = defaultFactory.newThread(r); - t.setDaemon(true); - t.setName("Detect Timing Issues"); - return t; - } - }); - - final AtomicInteger occurrencesOutOfRange = new AtomicInteger(0); - final AtomicInteger occurrences = new AtomicInteger(0); - final Runnable command = new Runnable() { - @Override - public void run() { - final long curMillis = System.currentTimeMillis(); - final long difference = curMillis - lastTriggerMillis.get(); - final long millisOff = Math.abs(difference - 2000L); - occurrences.incrementAndGet(); - if (millisOff > 500L) { - occurrencesOutOfRange.incrementAndGet(); - } - lastTriggerMillis.set(curMillis); - } - }; - - final ScheduledFuture<?> future = service.scheduleWithFixedDelay(command, 2000L, 2000L, TimeUnit.MILLISECONDS); - - final TimerTask timerTask = new TimerTask() { - @Override - public void run() { - future.cancel(true); - service.shutdownNow(); - - if (occurrences.get() < minRequiredOccurrences || occurrencesOutOfRange.get() > maxOccurrencesOutOfRange) { - LOGGER.warn("NiFi has detected that this box is not responding within the expected timing interval, which may cause " - + "Processors to be scheduled erratically. Please see the NiFi documentation for more information."); - } - } - }; - final Timer timer = new Timer(true); - timer.schedule(timerTask, 60000L); - } - - /** - * Main entry point of the application. - * - * @param args things which are ignored - */ - public static void main(String[] args) { - LOGGER.info("Launching NiFi..."); - try { - NiFiProperties properties = convertArgumentsToValidatedNiFiProperties(args); - new NiFi(properties); - } catch (final Throwable t) { - LOGGER.error("Failure to launch NiFi due to " + t, t); - } - } - - protected static NiFiProperties convertArgumentsToValidatedNiFiProperties(String[] args) { - final ClassLoader bootstrap = createBootstrapClassLoader(); - NiFiProperties properties = initializeProperties(args, bootstrap); - properties.validate(); - return properties; - } - - private static NiFiProperties initializeProperties(final String[] args, final ClassLoader boostrapLoader) { - // Try to get key - // If key doesn't exist, instantiate without - // Load properties - // If properties are protected and key missing, throw RuntimeException - - final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); - final String key; - try { - key = loadFormattedKey(args); - // The key might be empty or null when it is passed to the loader - } catch (IllegalArgumentException e) { - final String msg = "The bootstrap process did not provide a valid key"; - throw new IllegalArgumentException(msg, e); - } - Thread.currentThread().setContextClassLoader(boostrapLoader); - - try { - final Class<?> propsLoaderClass = Class.forName("org.apache.nifi.properties.NiFiPropertiesLoader", true, boostrapLoader); - final Method withKeyMethod = propsLoaderClass.getMethod("withKey", String.class); - final Object loaderInstance = withKeyMethod.invoke(null, key); - final Method getMethod = propsLoaderClass.getMethod("get"); - final NiFiProperties properties = (NiFiProperties) getMethod.invoke(loaderInstance); - LOGGER.info("Loaded {} properties", properties.size()); - return properties; - } catch (InvocationTargetException wrappedException) { - final String msg = "There was an issue decrypting protected properties"; - throw new IllegalArgumentException(msg, wrappedException.getCause() == null ? wrappedException : wrappedException.getCause()); - } catch (final IllegalAccessException | NoSuchMethodException | ClassNotFoundException reex) { - final String msg = "Unable to access properties loader in the expected manner - apparent classpath or build issue"; - throw new IllegalArgumentException(msg, reex); - } catch (final RuntimeException e) { - final String msg = "There was an issue decrypting protected properties"; - throw new IllegalArgumentException(msg, e); - } finally { - Thread.currentThread().setContextClassLoader(contextClassLoader); - } - } - - private static String loadFormattedKey(String[] args) { - String key = null; - List<String> parsedArgs = parseArgs(args); - // Check if args contain protection key - if (parsedArgs.contains(KEY_FILE_FLAG)) { - key = getKeyFromKeyFileAndPrune(parsedArgs); - // Format the key (check hex validity and remove spaces) - key = formatHexKey(key); - - } - - if (null == key) { - return ""; - } else if (!isHexKeyValid(key)) { - throw new IllegalArgumentException("The key was not provided in valid hex format and of the correct length"); - } else { - return key; - } - } - - private static String getKeyFromKeyFileAndPrune(List<String> parsedArgs) { - String key = null; - LOGGER.debug("The bootstrap process provided the " + KEY_FILE_FLAG + " flag"); - int i = parsedArgs.indexOf(KEY_FILE_FLAG); - if (parsedArgs.size() <= i + 1) { - LOGGER.error("The bootstrap process passed the {} flag without a filename", KEY_FILE_FLAG); - throw new IllegalArgumentException("The bootstrap process provided the " + KEY_FILE_FLAG + " flag but no key"); - } - try { - String passwordfile_path = parsedArgs.get(i + 1); - // Slurp in the contents of the file: - byte[] encoded = Files.readAllBytes(Paths.get(passwordfile_path)); - key = new String(encoded,StandardCharsets.UTF_8); - if (0 == key.length()) - throw new IllegalArgumentException("Key in keyfile " + passwordfile_path + " yielded an empty key"); - - LOGGER.info("Now overwriting file in "+passwordfile_path); - - // Overwrite the contents of the file (to avoid littering file system - // unlinked with key material): - File password_file = new File(passwordfile_path); - FileWriter overwriter = new FileWriter(password_file,false); - - // Construe a random pad: - Random r = new Random(); - StringBuffer sb = new StringBuffer(); - // Note on correctness: this pad is longer, but equally sufficient. - while(sb.length() < encoded.length){ - sb.append(Integer.toHexString(r.nextInt())); - } - String pad = sb.toString(); - LOGGER.info("Overwriting key material with pad: "+pad); - overwriter.write(pad); - overwriter.close(); - - LOGGER.info("Removing/unlinking file: "+passwordfile_path); - password_file.delete(); - - } catch (IOException e) { - LOGGER.error("Caught IOException while retrieving the "+KEY_FILE_FLAG+"-passed keyfile; aborting: "+e.toString()); - System.exit(1); - } - - LOGGER.info("Read property protection key from key file provided by bootstrap process"); - return key; - } - - private static List<String> parseArgs(String[] args) { - List<String> parsedArgs = new ArrayList<>(Arrays.asList(args)); - for (int i = 0; i < parsedArgs.size(); i++) { - if (parsedArgs.get(i).startsWith(KEY_FILE_FLAG + " ")) { - String[] split = parsedArgs.get(i).split(" ", 2); - parsedArgs.set(i, split[0]); - parsedArgs.add(i + 1, split[1]); - break; - } - } - return parsedArgs; - } - - private static String formatHexKey(String input) { - if (input == null || input.trim().isEmpty()) { - return ""; - } - return input.replaceAll("[^0-9a-fA-F]", "").toLowerCase(); - } - - private static boolean isHexKeyValid(String key) { - if (key == null || key.trim().isEmpty()) { - return false; - } - // Key length is in "nibbles" (i.e. one hex char = 4 bits) - return Arrays.asList(128, 196, 256).contains(key.length() * 4) && key.matches("^[0-9a-fA-F]*$"); - } -} diff --git a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/util/NiFiProperties.java b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/util/NiFiProperties.java index 3b341ec..b1b41c1 100644 --- a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -192,7 +192,6 @@ public abstract class NiFiProperties { // ui properties public static final String UI_BANNER_TEXT = "nifi.ui.banner.text"; public static final String UI_AUTO_REFRESH_INTERVAL = "nifi.ui.autorefresh.interval"; - public static final String UI_DCAE_DISTRIBUTOR_API_URL="nifi.ui.dcae.distibutor.api.url"; // cluster common properties public static final String CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "nifi.cluster.protocol.heartbeat.interval"; @@ -717,15 +716,6 @@ public abstract class NiFiProperties { return this.getProperty(UI_BANNER_TEXT, StringUtils.EMPTY); } - - /** - * @author Renu - * @return the IP address where the nifi-app is being hosted - */ - public String getDcaeDistributorApiHostname() { - return getProperty(UI_DCAE_DISTRIBUTOR_API_URL); - } - /** * Returns the auto refresh interval in seconds. * diff --git a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java deleted file mode 100644 index 8ad05bd..0000000 --- a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ /dev/null @@ -1,4899 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Modifications to the original nifi code for the ONAP project are made - * available under the Apache License, Version 2.0 - */ -package org.apache.nifi.web; - -import com.google.common.collect.Sets; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.nifi.action.Action; -import org.apache.nifi.action.Component; -import org.apache.nifi.action.FlowChangeAction; -import org.apache.nifi.action.Operation; -import org.apache.nifi.action.details.FlowChangePurgeDetails; -import org.apache.nifi.admin.service.AuditService; -import org.apache.nifi.authorization.AccessDeniedException; -import org.apache.nifi.authorization.AccessPolicy; -import org.apache.nifi.authorization.AuthorizableLookup; -import org.apache.nifi.authorization.AuthorizationRequest; -import org.apache.nifi.authorization.AuthorizationResult; -import org.apache.nifi.authorization.AuthorizationResult.Result; -import org.apache.nifi.authorization.AuthorizeAccess; -import org.apache.nifi.authorization.Authorizer; -import org.apache.nifi.authorization.Group; -import org.apache.nifi.authorization.RequestAction; -import org.apache.nifi.authorization.Resource; -import org.apache.nifi.authorization.User; -import org.apache.nifi.authorization.UserContextKeys; -import org.apache.nifi.authorization.resource.Authorizable; -import org.apache.nifi.authorization.resource.EnforcePolicyPermissionsThroughBaseResource; -import org.apache.nifi.authorization.resource.OperationAuthorizable; -import org.apache.nifi.authorization.resource.ResourceFactory; -import org.apache.nifi.authorization.user.NiFiUser; -import org.apache.nifi.authorization.user.NiFiUserUtils; -import org.apache.nifi.bundle.BundleCoordinate; -import org.apache.nifi.cluster.coordination.ClusterCoordinator; -import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor; -import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat; -import org.apache.nifi.cluster.coordination.node.ClusterRoles; -import org.apache.nifi.cluster.coordination.node.DisconnectionCode; -import org.apache.nifi.cluster.coordination.node.NodeConnectionState; -import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; -import org.apache.nifi.cluster.coordination.node.OffloadCode; -import org.apache.nifi.cluster.event.NodeEvent; -import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException; -import org.apache.nifi.cluster.manager.exception.UnknownNodeException; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.components.ConfigurableComponent; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.RequiredPermission; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; -import org.apache.nifi.components.state.Scope; -import org.apache.nifi.components.state.StateMap; -import org.apache.nifi.connectable.Connectable; -import org.apache.nifi.connectable.Connection; -import org.apache.nifi.connectable.Funnel; -import org.apache.nifi.connectable.Port; -import org.apache.nifi.controller.ComponentNode; -import org.apache.nifi.controller.Counter; -import org.apache.nifi.controller.FlowController; -import org.apache.nifi.controller.ProcessorNode; -import org.apache.nifi.controller.ReportingTaskNode; -import org.apache.nifi.controller.ScheduledState; -import org.apache.nifi.controller.Snippet; -import org.apache.nifi.controller.Template; -import org.apache.nifi.controller.label.Label; -import org.apache.nifi.controller.leader.election.LeaderElectionManager; -import org.apache.nifi.controller.repository.claim.ContentDirection; -import org.apache.nifi.controller.service.ControllerServiceNode; -import org.apache.nifi.controller.service.ControllerServiceReference; -import org.apache.nifi.controller.service.ControllerServiceState; -import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.controller.status.ProcessorStatus; -import org.apache.nifi.diagnostics.SystemDiagnostics; -import org.apache.nifi.events.BulletinFactory; -import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.groups.ProcessGroupCounts; -import org.apache.nifi.groups.RemoteProcessGroup; -import org.apache.nifi.history.History; -import org.apache.nifi.history.HistoryQuery; -import org.apache.nifi.history.PreviousValue; -import org.apache.nifi.registry.ComponentVariableRegistry; -import org.apache.nifi.registry.authorization.Permissions; -import org.apache.nifi.registry.bucket.Bucket; -import org.apache.nifi.registry.client.NiFiRegistryException; -import org.apache.nifi.registry.flow.FlowRegistry; -import org.apache.nifi.registry.flow.FlowRegistryClient; -import org.apache.nifi.registry.flow.VersionControlInformation; -import org.apache.nifi.registry.flow.VersionedComponent; -import org.apache.nifi.registry.flow.VersionedConnection; -import org.apache.nifi.registry.flow.VersionedFlow; -import org.apache.nifi.registry.flow.VersionedFlowCoordinates; -import org.apache.nifi.registry.flow.VersionedFlowSnapshot; -import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; -import org.apache.nifi.registry.flow.VersionedFlowState; -import org.apache.nifi.registry.flow.VersionedProcessGroup; -import org.apache.nifi.registry.flow.diff.ComparableDataFlow; -import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor; -import org.apache.nifi.registry.flow.diff.DifferenceType; -import org.apache.nifi.registry.flow.diff.FlowComparator; -import org.apache.nifi.registry.flow.diff.FlowComparison; -import org.apache.nifi.registry.flow.diff.FlowDifference; -import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow; -import org.apache.nifi.registry.flow.diff.StandardFlowComparator; -import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor; -import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent; -import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService; -import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup; -import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor; -import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPort; -import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; -import org.apache.nifi.remote.RemoteGroupPort; -import org.apache.nifi.remote.RootGroupPort; -import org.apache.nifi.reporting.Bulletin; -import org.apache.nifi.reporting.BulletinQuery; -import org.apache.nifi.reporting.BulletinRepository; -import org.apache.nifi.reporting.ComponentType; -import org.apache.nifi.util.BundleUtils; -import org.apache.nifi.util.FlowDifferenceFilters; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.api.dto.AccessPolicyDTO; -import org.apache.nifi.web.api.dto.AccessPolicySummaryDTO; -import org.apache.nifi.web.api.dto.AffectedComponentDTO; -import org.apache.nifi.web.api.dto.BucketDTO; -import org.apache.nifi.web.api.dto.BulletinBoardDTO; -import org.apache.nifi.web.api.dto.BulletinDTO; -import org.apache.nifi.web.api.dto.BulletinQueryDTO; -import org.apache.nifi.web.api.dto.BundleDTO; -import org.apache.nifi.web.api.dto.ClusterDTO; -import org.apache.nifi.web.api.dto.ComponentDTO; -import org.apache.nifi.web.api.dto.ComponentDifferenceDTO; -import org.apache.nifi.web.api.dto.ComponentHistoryDTO; -import org.apache.nifi.web.api.dto.ComponentReferenceDTO; -import org.apache.nifi.web.api.dto.ComponentRestrictionPermissionDTO; -import org.apache.nifi.web.api.dto.ComponentStateDTO; -import org.apache.nifi.web.api.dto.ConnectionDTO; -import org.apache.nifi.web.api.dto.ControllerConfigurationDTO; -import org.apache.nifi.web.api.dto.ControllerDTO; -import org.apache.nifi.web.api.dto.ControllerServiceDTO; -import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; -import org.apache.nifi.web.api.dto.CounterDTO; -import org.apache.nifi.web.api.dto.CountersDTO; -import org.apache.nifi.web.api.dto.CountersSnapshotDTO; -import org.apache.nifi.web.api.dto.DocumentedTypeDTO; -import org.apache.nifi.web.api.dto.DropRequestDTO; -import org.apache.nifi.web.api.dto.DtoFactory; -import org.apache.nifi.web.api.dto.EntityFactory; -import org.apache.nifi.web.api.dto.FlowConfigurationDTO; -import org.apache.nifi.web.api.dto.FlowFileDTO; -import org.apache.nifi.web.api.dto.FlowSnippetDTO; -import org.apache.nifi.web.api.dto.FunnelDTO; -import org.apache.nifi.web.api.dto.LabelDTO; -import org.apache.nifi.web.api.dto.ListingRequestDTO; -import org.apache.nifi.web.api.dto.NodeDTO; -import org.apache.nifi.web.api.dto.PermissionsDTO; -import org.apache.nifi.web.api.dto.PortDTO; -import org.apache.nifi.web.api.dto.PreviousValueDTO; -import org.apache.nifi.web.api.dto.ProcessGroupDTO; -import org.apache.nifi.web.api.dto.ProcessorConfigDTO; -import org.apache.nifi.web.api.dto.ProcessorDTO; -import org.apache.nifi.web.api.dto.PropertyDescriptorDTO; -import org.apache.nifi.web.api.dto.PropertyHistoryDTO; -import org.apache.nifi.web.api.dto.RegistryDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; -import org.apache.nifi.web.api.dto.ReportingTaskDTO; -import org.apache.nifi.web.api.dto.RequiredPermissionDTO; -import org.apache.nifi.web.api.dto.ResourceDTO; -import org.apache.nifi.web.api.dto.RevisionDTO; -import org.apache.nifi.web.api.dto.SnippetDTO; -import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO; -import org.apache.nifi.web.api.dto.TemplateDTO; -import org.apache.nifi.web.api.dto.UserDTO; -import org.apache.nifi.web.api.dto.UserGroupDTO; -import org.apache.nifi.web.api.dto.VariableRegistryDTO; -import org.apache.nifi.web.api.dto.VersionControlInformationDTO; -import org.apache.nifi.web.api.dto.VersionedFlowDTO; -import org.apache.nifi.web.api.dto.action.HistoryDTO; -import org.apache.nifi.web.api.dto.action.HistoryQueryDTO; -import org.apache.nifi.web.api.dto.diagnostics.ConnectionDiagnosticsDTO; -import org.apache.nifi.web.api.dto.diagnostics.ControllerServiceDiagnosticsDTO; -import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsDTO; -import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsSnapshotDTO; -import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO; -import org.apache.nifi.web.api.dto.flow.FlowDTO; -import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO; -import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; -import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO; -import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO; -import org.apache.nifi.web.api.dto.search.SearchResultsDTO; -import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; -import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; -import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO; -import org.apache.nifi.web.api.dto.status.PortStatusDTO; -import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; -import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO; -import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; -import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; -import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; -import org.apache.nifi.web.api.entity.AccessPolicyEntity; -import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity; -import org.apache.nifi.web.api.entity.ActionEntity; -import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity; -import org.apache.nifi.web.api.entity.AffectedComponentEntity; -import org.apache.nifi.web.api.entity.BucketEntity; -import org.apache.nifi.web.api.entity.BulletinEntity; -import org.apache.nifi.web.api.entity.ComponentReferenceEntity; -import org.apache.nifi.web.api.entity.ConnectionEntity; -import org.apache.nifi.web.api.entity.ConnectionStatusEntity; -import org.apache.nifi.web.api.entity.ControllerBulletinsEntity; -import org.apache.nifi.web.api.entity.ControllerConfigurationEntity; -import org.apache.nifi.web.api.entity.ControllerServiceEntity; -import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity; -import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; -import org.apache.nifi.web.api.entity.CurrentUserEntity; -import org.apache.nifi.web.api.entity.FlowComparisonEntity; -import org.apache.nifi.web.api.entity.FlowConfigurationEntity; -import org.apache.nifi.web.api.entity.FlowEntity; -import org.apache.nifi.web.api.entity.FunnelEntity; -import org.apache.nifi.web.api.entity.LabelEntity; -import org.apache.nifi.web.api.entity.PortEntity; -import org.apache.nifi.web.api.entity.PortStatusEntity; -import org.apache.nifi.web.api.entity.ProcessGroupEntity; -import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity; -import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; -import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity; -import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity; -import org.apache.nifi.web.api.entity.ProcessorEntity; -import org.apache.nifi.web.api.entity.ProcessorStatusEntity; -import org.apache.nifi.web.api.entity.RegistryClientEntity; -import org.apache.nifi.web.api.entity.RegistryEntity; -import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; -import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity; -import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity; -import org.apache.nifi.web.api.entity.ReportingTaskEntity; -import org.apache.nifi.web.api.entity.ScheduleComponentsEntity; -import org.apache.nifi.web.api.entity.SnippetEntity; -import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity; -import org.apache.nifi.web.api.entity.StatusHistoryEntity; -import org.apache.nifi.web.api.entity.TemplateEntity; -import org.apache.nifi.web.api.entity.TenantEntity; -import org.apache.nifi.web.api.entity.UserEntity; -import org.apache.nifi.web.api.entity.UserGroupEntity; -import org.apache.nifi.web.api.entity.VariableEntity; -import org.apache.nifi.web.api.entity.VariableRegistryEntity; -import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity; -import org.apache.nifi.web.api.entity.VersionControlInformationEntity; -import org.apache.nifi.web.api.entity.VersionedFlowEntity; -import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity; -import org.apache.nifi.web.controller.ControllerFacade; -import org.apache.nifi.web.dao.AccessPolicyDAO; -import org.apache.nifi.web.dao.ConnectionDAO; -import org.apache.nifi.web.dao.ControllerServiceDAO; -import org.apache.nifi.web.dao.FunnelDAO; -import org.apache.nifi.web.dao.LabelDAO; -import org.apache.nifi.web.dao.PortDAO; -import org.apache.nifi.web.dao.ProcessGroupDAO; -import org.apache.nifi.web.dao.ProcessorDAO; -import org.apache.nifi.web.dao.RegistryDAO; -import org.apache.nifi.web.dao.RemoteProcessGroupDAO; -import org.apache.nifi.web.dao.ReportingTaskDAO; -import org.apache.nifi.web.dao.SnippetDAO; -import org.apache.nifi.web.dao.TemplateDAO; -import org.apache.nifi.web.dao.UserDAO; -import org.apache.nifi.web.dao.UserGroupDAO; -import org.apache.nifi.web.revision.DeleteRevisionTask; -import org.apache.nifi.web.revision.ExpiredRevisionClaimException; -import org.apache.nifi.web.revision.RevisionClaim; -import org.apache.nifi.web.revision.RevisionManager; -import org.apache.nifi.web.revision.RevisionUpdate; -import org.apache.nifi.web.revision.StandardRevisionClaim; -import org.apache.nifi.web.revision.StandardRevisionUpdate; -import org.apache.nifi.web.revision.UpdateRevisionTask; -import org.apache.nifi.web.util.SnippetUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Response; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** - * Implementation of NiFiServiceFacade that performs revision checking. - */ -public class StandardNiFiServiceFacade implements NiFiServiceFacade { - private static final Logger logger = LoggerFactory.getLogger(StandardNiFiServiceFacade.class); - private static final int VALIDATION_WAIT_MILLIS = 50; - - // nifi core components - private ControllerFacade controllerFacade; - private SnippetUtils snippetUtils; - - // revision manager - private RevisionManager revisionManager; - private BulletinRepository bulletinRepository; - - // data access objects - private ProcessorDAO processorDAO; - private ProcessGroupDAO processGroupDAO; - private RemoteProcessGroupDAO remoteProcessGroupDAO; - private LabelDAO labelDAO; - private FunnelDAO funnelDAO; - private SnippetDAO snippetDAO; - private PortDAO inputPortDAO; - private PortDAO outputPortDAO; - private ConnectionDAO connectionDAO; - private ControllerServiceDAO controllerServiceDAO; - private ReportingTaskDAO reportingTaskDAO; - private TemplateDAO templateDAO; - private UserDAO userDAO; - private UserGroupDAO userGroupDAO; - private AccessPolicyDAO accessPolicyDAO; - private RegistryDAO registryDAO; - private ClusterCoordinator clusterCoordinator; - private HeartbeatMonitor heartbeatMonitor; - private LeaderElectionManager leaderElectionManager; - - // administrative services - private AuditService auditService; - - // flow registry - private FlowRegistryClient flowRegistryClient; - - // properties - private NiFiProperties properties; - private DtoFactory dtoFactory; - private EntityFactory entityFactory; - - private Authorizer authorizer; - - private AuthorizableLookup authorizableLookup; - - // ----------------------------------------- - // Synchronization methods - // ----------------------------------------- - @Override - public void authorizeAccess(final AuthorizeAccess authorizeAccess) { - authorizeAccess.authorize(authorizableLookup); - } - - @Override - public void verifyRevision(final Revision revision, final NiFiUser user) { - final Revision curRevision = revisionManager.getRevision(revision.getComponentId()); - if (revision.equals(curRevision)) { - return; - } - - throw new InvalidRevisionException(revision + " is not the most up-to-date revision. This component appears to have been modified"); - } - - @Override - public void verifyRevisions(final Set<Revision> revisions, final NiFiUser user) { - for (final Revision revision : revisions) { - verifyRevision(revision, user); - } - } - - @Override - public Set<Revision> getRevisionsFromGroup(final String groupId, final Function<ProcessGroup, Set<String>> getComponents) { - final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); - final Set<String> componentIds = getComponents.apply(group); - return componentIds.stream().map(id -> revisionManager.getRevision(id)).collect(Collectors.toSet()); - } - - @Override - public Set<Revision> getRevisionsFromSnippet(final String snippetId) { - final Snippet snippet = snippetDAO.getSnippet(snippetId); - final Set<String> componentIds = new HashSet<>(); - componentIds.addAll(snippet.getProcessors().keySet()); - componentIds.addAll(snippet.getFunnels().keySet()); - componentIds.addAll(snippet.getLabels().keySet()); - componentIds.addAll(snippet.getConnections().keySet()); - componentIds.addAll(snippet.getInputPorts().keySet()); - componentIds.addAll(snippet.getOutputPorts().keySet()); - componentIds.addAll(snippet.getProcessGroups().keySet()); - componentIds.addAll(snippet.getRemoteProcessGroups().keySet()); - return componentIds.stream().map(id -> revisionManager.getRevision(id)).collect(Collectors.toSet()); - } - - // ----------------------------------------- - // Verification Operations - // ----------------------------------------- - - @Override - public void verifyListQueue(final String connectionId) { - connectionDAO.verifyList(connectionId); - } - - @Override - public void verifyCreateConnection(final String groupId, final ConnectionDTO connectionDTO) { - connectionDAO.verifyCreate(groupId, connectionDTO); - } - - @Override - public void verifyUpdateConnection(final ConnectionDTO connectionDTO) { - // if connection does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (connectionDAO.hasConnection(connectionDTO.getId())) { - connectionDAO.verifyUpdate(connectionDTO); - } else { - connectionDAO.verifyCreate(connectionDTO.getParentGroupId(), connectionDTO); - } - } - - @Override - public void verifyDeleteConnection(final String connectionId) { - connectionDAO.verifyDelete(connectionId); - } - - @Override - public void verifyDeleteFunnel(final String funnelId) { - funnelDAO.verifyDelete(funnelId); - } - - @Override - public void verifyUpdateInputPort(final PortDTO inputPortDTO) { - // if connection does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (inputPortDAO.hasPort(inputPortDTO.getId())) { - inputPortDAO.verifyUpdate(inputPortDTO); - } - } - - @Override - public void verifyDeleteInputPort(final String inputPortId) { - inputPortDAO.verifyDelete(inputPortId); - } - - @Override - public void verifyUpdateOutputPort(final PortDTO outputPortDTO) { - // if connection does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (outputPortDAO.hasPort(outputPortDTO.getId())) { - outputPortDAO.verifyUpdate(outputPortDTO); - } - } - - @Override - public void verifyDeleteOutputPort(final String outputPortId) { - outputPortDAO.verifyDelete(outputPortId); - } - - @Override - public void verifyCreateProcessor(ProcessorDTO processorDTO) { - processorDAO.verifyCreate(processorDTO); - } - - @Override - public void verifyUpdateProcessor(final ProcessorDTO processorDTO) { - // if group does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (processorDAO.hasProcessor(processorDTO.getId())) { - processorDAO.verifyUpdate(processorDTO); - } else { - verifyCreateProcessor(processorDTO); - } - } - - @Override - public void verifyDeleteProcessor(final String processorId) { - processorDAO.verifyDelete(processorId); - } - - @Override - public void verifyScheduleComponents(final String groupId, final ScheduledState state, final Set<String> componentIds) { - processGroupDAO.verifyScheduleComponents(groupId, state, componentIds); - } - - @Override - public void verifyEnableComponents(String processGroupId, ScheduledState state, Set<String> componentIds) { - processGroupDAO.verifyEnableComponents(processGroupId, state, componentIds); - } - - @Override - public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Collection<String> serviceIds) { - processGroupDAO.verifyActivateControllerServices(state, serviceIds); - } - - @Override - public void verifyDeleteProcessGroup(final String groupId) { - processGroupDAO.verifyDelete(groupId); - } - - @Override - public void verifyUpdateRemoteProcessGroup(final RemoteProcessGroupDTO remoteProcessGroupDTO) { - // if remote group does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (remoteProcessGroupDAO.hasRemoteProcessGroup(remoteProcessGroupDTO.getId())) { - remoteProcessGroupDAO.verifyUpdate(remoteProcessGroupDTO); - } - } - - @Override - public void verifyUpdateRemoteProcessGroupInputPort(final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { - remoteProcessGroupDAO.verifyUpdateInputPort(remoteProcessGroupId, remoteProcessGroupPortDTO); - } - - @Override - public void verifyUpdateRemoteProcessGroupOutputPort(final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { - remoteProcessGroupDAO.verifyUpdateOutputPort(remoteProcessGroupId, remoteProcessGroupPortDTO); - } - - @Override - public void verifyDeleteRemoteProcessGroup(final String remoteProcessGroupId) { - remoteProcessGroupDAO.verifyDelete(remoteProcessGroupId); - } - - @Override - public void verifyCreateControllerService(ControllerServiceDTO controllerServiceDTO) { - controllerServiceDAO.verifyCreate(controllerServiceDTO); - } - - @Override - public void verifyUpdateControllerService(final ControllerServiceDTO controllerServiceDTO) { - // if service does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (controllerServiceDAO.hasControllerService(controllerServiceDTO.getId())) { - controllerServiceDAO.verifyUpdate(controllerServiceDTO); - } else { - verifyCreateControllerService(controllerServiceDTO); - } - } - - @Override - public void verifyUpdateControllerServiceReferencingComponents(final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) { - controllerServiceDAO.verifyUpdateReferencingComponents(controllerServiceId, scheduledState, controllerServiceState); - } - - @Override - public void verifyDeleteControllerService(final String controllerServiceId) { - controllerServiceDAO.verifyDelete(controllerServiceId); - } - - @Override - public void verifyCreateReportingTask(ReportingTaskDTO reportingTaskDTO) { - reportingTaskDAO.verifyCreate(reportingTaskDTO); - } - - @Override - public void verifyUpdateReportingTask(final ReportingTaskDTO reportingTaskDTO) { - // if tasks does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (reportingTaskDAO.hasReportingTask(reportingTaskDTO.getId())) { - reportingTaskDAO.verifyUpdate(reportingTaskDTO); - } else { - verifyCreateReportingTask(reportingTaskDTO); - } - } - - @Override - public void verifyDeleteReportingTask(final String reportingTaskId) { - reportingTaskDAO.verifyDelete(reportingTaskId); - } - - // ----------------------------------------- - // Write Operations - // ----------------------------------------- - - @Override - public AccessPolicyEntity updateAccessPolicy(final Revision revision, final AccessPolicyDTO accessPolicyDTO) { - final Authorizable authorizable = authorizableLookup.getAccessPolicyById(accessPolicyDTO.getId()); - final RevisionUpdate<AccessPolicyDTO> snapshot = updateComponent(revision, - authorizable, - () -> accessPolicyDAO.updateAccessPolicy(accessPolicyDTO), - accessPolicy -> { - final Set<TenantEntity> users = accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()); - final Set<TenantEntity> userGroups = accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()); - final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource()); - return dtoFactory.createAccessPolicyDto(accessPolicy, userGroups, users, componentReference); - }); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizable); - return entityFactory.createAccessPolicyEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); - } - - @Override - public UserEntity updateUser(final Revision revision, final UserDTO userDTO) { - final Authorizable usersAuthorizable = authorizableLookup.getTenant(); - final Set<Group> groups = userGroupDAO.getUserGroupsForUser(userDTO.getId()); - final Set<AccessPolicy> policies = userGroupDAO.getAccessPoliciesForUser(userDTO.getId()); - final RevisionUpdate<UserDTO> snapshot = updateComponent(revision, - usersAuthorizable, - () -> userDAO.updateUser(userDTO), - user -> { - final Set<TenantEntity> tenantEntities = groups.stream().map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()); - final Set<AccessPolicySummaryEntity> policyEntities = policies.stream().map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); - return dtoFactory.createUserDto(user, tenantEntities, policyEntities); - }); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(usersAuthorizable); - return entityFactory.createUserEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); - } - - @Override - public UserGroupEntity updateUserGroup(final Revision revision, final UserGroupDTO userGroupDTO) { - final Authorizable userGroupsAuthorizable = authorizableLookup.getTenant(); - final Set<AccessPolicy> policies = userGroupDAO.getAccessPoliciesForUserGroup(userGroupDTO.getId()); - final RevisionUpdate<UserGroupDTO> snapshot = updateComponent(revision, - userGroupsAuthorizable, - () -> userGroupDAO.updateUserGroup(userGroupDTO), - userGroup -> { - final Set<TenantEntity> tenantEntities = userGroup.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()); - final Set<AccessPolicySummaryEntity> policyEntities = policies.stream().map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); - return dtoFactory.createUserGroupDto(userGroup, tenantEntities, policyEntities); - } - ); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(userGroupsAuthorizable); - return entityFactory.createUserGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); - } - - @Override - public ConnectionEntity updateConnection(final Revision revision, final ConnectionDTO connectionDTO) { - final Connection connectionNode = connectionDAO.getConnection(connectionDTO.getId()); - - final RevisionUpdate<ConnectionDTO> snapshot = updateComponent( - revision, - connectionNode, - () -> connectionDAO.updateConnection(connectionDTO), - connection -> dtoFactory.createConnectionDto(connection)); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connectionNode); - final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionNode.getIdentifier())); - return entityFactory.createConnectionEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status); - } - - @Override - public ProcessorEntity updateProcessor(final Revision revision, final ProcessorDTO processorDTO) { - // get the component, ensure we have access to it, and perform the update request - final ProcessorNode processorNode = processorDAO.getProcessor(processorDTO.getId()); - final RevisionUpdate<ProcessorDTO> snapshot = updateComponent(revision, - processorNode, - () -> processorDAO.updateProcessor(processorDTO), - proc -> { - awaitValidationCompletion(proc); - return dtoFactory.createProcessorDto(proc); - }); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processorNode); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processorNode)); - final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processorNode.getIdentifier())); - final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processorNode.getIdentifier())); - final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); - } - - private void awaitValidationCompletion(final ComponentNode component) { - component.getValidationStatus(VALIDATION_WAIT_MILLIS, TimeUnit.MILLISECONDS); - } - - @Override - public LabelEntity updateLabel(final Revision revision, final LabelDTO labelDTO) { - final Label labelNode = labelDAO.getLabel(labelDTO.getId()); - final RevisionUpdate<LabelDTO> snapshot = updateComponent(revision, - labelNode, - () -> labelDAO.updateLabel(labelDTO), - label -> dtoFactory.createLabelDto(label)); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(labelNode); - return entityFactory.createLabelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); - } - - @Override - public FunnelEntity updateFunnel(final Revision revision, final FunnelDTO funnelDTO) { - final Funnel funnelNode = funnelDAO.getFunnel(funnelDTO.getId()); - final RevisionUpdate<FunnelDTO> snapshot = updateComponent(revision, - funnelNode, - () -> funnelDAO.updateFunnel(funnelDTO), - funnel -> dtoFactory.createFunnelDto(funnel)); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnelNode); - return entityFactory.createFunnelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); - } - - - /** - * Updates a component with the given revision, using the provided supplier to call - * into the appropriate DAO and the provided function to convert the component into a DTO. - * - * @param revision the current revision - * @param daoUpdate a Supplier that will update the component via the appropriate DAO - * @param dtoCreation a Function to convert a component into a dao - * @param <D> the DTO Type of the updated component - * @param <C> the Component Type of the updated component - * @return A RevisionUpdate that represents the new configuration - */ - private <D, C> RevisionUpdate<D> updateComponent(final Revision revision, final Authorizable authorizable, final Supplier<C> daoUpdate, final Function<C, D> dtoCreation) { - try { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - - final RevisionUpdate<D> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(revision), user, new UpdateRevisionTask<D>() { - @Override - public RevisionUpdate<D> update() { - // get the updated component - final C component = daoUpdate.get(); - - // save updated controller - controllerFacade.save(); - - final D dto = dtoCreation.apply(component); - - final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId()); - final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity()); - return new StandardRevisionUpdate<>(dto, lastModification); - } - }); - - return updatedComponent; - } catch (final ExpiredRevisionClaimException erce) { - throw new InvalidRevisionException("Failed to update component " + authorizable, erce); - } - } - - - @Override - public void verifyUpdateSnippet(final SnippetDTO snippetDto, final Set<String> affectedComponentIds) { - // if snippet does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (snippetDAO.hasSnippet(snippetDto.getId())) { - snippetDAO.verifyUpdateSnippetComponent(snippetDto); - } - } - - @Override - public SnippetEntity updateSnippet(final Set<Revision> revisions, final SnippetDTO snippetDto) { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final RevisionClaim revisionClaim = new StandardRevisionClaim(revisions); - - final RevisionUpdate<SnippetDTO> snapshot; - try { - snapshot = revisionManager.updateRevision(revisionClaim, user, new UpdateRevisionTask<SnippetDTO>() { - @Override - public RevisionUpdate<SnippetDTO> update() { - // get the updated component - final Snippet snippet = snippetDAO.updateSnippetComponents(snippetDto); - - // drop the snippet - snippetDAO.dropSnippet(snippet.getId()); - - // save updated controller - controllerFacade.save(); - - // increment the revisions - final Set<Revision> updatedRevisions = revisions.stream().map(revision -> { - final Revision currentRevision = revisionManager.getRevision(revision.getComponentId()); - return currentRevision.incrementRevision(revision.getClientId()); - }).collect(Collectors.toSet()); - - final SnippetDTO dto = dtoFactory.createSnippetDto(snippet); - return new StandardRevisionUpdate<>(dto, null, updatedRevisions); - } - }); - } catch (final ExpiredRevisionClaimException e) { - throw new InvalidRevisionException("Failed to update Snippet", e); - } - - return entityFactory.createSnippetEntity(snapshot.getComponent()); - } - - @Override - public PortEntity updateInputPort(final Revision revision, final PortDTO inputPortDTO) { - final Port inputPortNode = inputPortDAO.getPort(inputPortDTO.getId()); - final RevisionUpdate<PortDTO> snapshot = updateComponent(revision, - inputPortNode, - () -> inputPortDAO.updatePort(inputPortDTO), - port -> dtoFactory.createPortDto(port)); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(inputPortNode); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(inputPortNode)); - final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(inputPortNode.getIdentifier())); - final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(inputPortNode.getIdentifier())); - final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); - } - - @Override - public PortEntity updateOutputPort(final Revision revision, final PortDTO outputPortDTO) { - final Port outputPortNode = outputPortDAO.getPort(outputPortDTO.getId()); - final RevisionUpdate<PortDTO> snapshot = updateComponent(revision, - outputPortNode, - () -> outputPortDAO.updatePort(outputPortDTO), - port -> dtoFactory.createPortDto(port)); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(outputPortNode); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(outputPortNode), NiFiUserUtils.getNiFiUser()); - final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(outputPortNode.getIdentifier())); - final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(outputPortNode.getIdentifier())); - final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); - } - - @Override - public RemoteProcessGroupEntity updateRemoteProcessGroup(final Revision revision, final RemoteProcessGroupDTO remoteProcessGroupDTO) { - final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId()); - final RevisionUpdate<RemoteProcessGroupDTO> snapshot = updateComponent( - revision, - remoteProcessGroupNode, - () -> remoteProcessGroupDAO.updateRemoteProcessGroup(remoteProcessGroupDTO), - remoteProcessGroup -> dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup)); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode)); - final RevisionDTO updateRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); - final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroupNode, - controllerFacade.getRemoteProcessGroupStatus(remoteProcessGroupNode.getIdentifier())); - final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(remoteProcessGroupNode.getIdentifier())); - final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), updateRevision, permissions, operatePermissions, status, bulletinEntities); - } - - @Override - public RemoteProcessGroupPortEntity updateRemoteProcessGroupInputPort( - final Revision revision, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { - - final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupPortDTO.getGroupId()); - final RevisionUpdate<RemoteProcessGroupPortDTO> snapshot = updateComponent( - revision, - remoteProcessGroupNode, - () -> remoteProcessGroupDAO.updateRemoteProcessGroupInputPort(remoteProcessGroupId, remoteProcessGroupPortDTO), - remoteGroupPort -> dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort)); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode)); - final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); - return entityFactory.createRemoteProcessGroupPortEntity(snapshot.getComponent(), updatedRevision, permissions, operatePermissions); - } - - @Override - public RemoteProcessGroupPortEntity updateRemoteProcessGroupOutputPort( - final Revision revision, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { - - final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupPortDTO.getGroupId()); - final RevisionUpdate<RemoteProcessGroupPortDTO> snapshot = updateComponent( - revision, - remoteProcessGroupNode, - () -> remoteProcessGroupDAO.updateRemoteProcessGroupOutputPort(remoteProcessGroupId, remoteProcessGroupPortDTO), - remoteGroupPort -> dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort)); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode)); - final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); - return entityFactory.createRemoteProcessGroupPortEntity(snapshot.getComponent(), updatedRevision, permissions, operatePermissions); - } - - @Override - public Set<AffectedComponentDTO> getActiveComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO variableRegistryDto) { - final ProcessGroup group = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId()); - if (group == null) { - throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistryDto.getProcessGroupId()); - } - - final Map<String, String> variableMap = new HashMap<>(); - variableRegistryDto.getVariables().stream() // have to use forEach here instead of using Collectors.toMap because value may be null - .map(VariableEntity::getVariable) - .forEach(var -> variableMap.put(var.getName(), var.getValue())); - - final Set<AffectedComponentDTO> affectedComponentDtos = new HashSet<>(); - - final Set<String> updatedVariableNames = getUpdatedVariables(group, variableMap); - for (final String variableName : updatedVariableNames) { - final Set<ComponentNode> affectedComponents = group.getComponentsAffectedByVariable(variableName); - - for (final ComponentNode component : affectedComponents) { - if (component instanceof ProcessorNode) { - final ProcessorNode procNode = (ProcessorNode) component; - if (procNode.isRunning()) { - affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(procNode)); - } - } else if (component instanceof ControllerServiceNode) { - final ControllerServiceNode serviceNode = (ControllerServiceNode) component; - if (serviceNode.isActive()) { - affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(serviceNode)); - } - } else { - throw new RuntimeException("Found unexpected type of Component [" + component.getCanonicalClassName() + "] dependending on variable"); - } - } - } - - return affectedComponentDtos; - } - - @Override - public Set<AffectedComponentEntity> getComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO variableRegistryDto) { - final ProcessGroup group = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId()); - if (group == null) { - throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistryDto.getProcessGroupId()); - } - - final Map<String, String> variableMap = new HashMap<>(); - variableRegistryDto.getVariables().stream() // have to use forEach here instead of using Collectors.toMap because value may be null - .map(VariableEntity::getVariable) - .forEach(var -> variableMap.put(var.getName(), var.getValue())); - - final Set<AffectedComponentEntity> affectedComponentEntities = new HashSet<>(); - - final Set<String> updatedVariableNames = getUpdatedVariables(group, variableMap); - for (final String variableName : updatedVariableNames) { - final Set<ComponentNode> affectedComponents = group.getComponentsAffectedByVariable(variableName); - affectedComponentEntities.addAll(dtoFactory.createAffectedComponentEntities(affectedComponents, revisionManager)); - } - - return affectedComponentEntities; - } - - private Set<String> getUpdatedVariables(final ProcessGroup group, final Map<String, String> newVariableValues) { - final Set<String> updatedVariableNames = new HashSet<>(); - - final ComponentVariableRegistry registry = group.getVariableRegistry(); - for (final Map.Entry<String, String> entry : newVariableValues.entrySet()) { - final String varName = entry.getKey(); - final String newValue = entry.getValue(); - - final String curValue = registry.getVariableValue(varName); - if (!Objects.equals(newValue, curValue)) { - updatedVariableNames.add(varName); - } - } - - return updatedVariableNames; - } - - - @Override - public VariableRegistryEntity updateVariableRegistry(Revision revision, VariableRegistryDTO variableRegistryDto) { - final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId()); - final RevisionUpdate<VariableRegistryDTO> snapshot = updateComponent(revision, - processGroupNode, - () -> processGroupDAO.updateVariableRegistry(variableRegistryDto), - processGroup -> dtoFactory.createVariableRegistryDto(processGroup, revisionManager)); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode); - final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); - return entityFactory.createVariableRegistryEntity(snapshot.getComponent(), updatedRevision, permissions); - } - - - @Override - public ProcessGroupEntity updateProcessGroup(final Revision revision, final ProcessGroupDTO processGroupDTO) { - final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(processGroupDTO.getId()); - final RevisionUpdate<ProcessGroupDTO> snapshot = updateComponent(revision, - processGroupNode, - () -> processGroupDAO.updateProcessGroup(processGroupDTO), - processGroup -> dtoFactory.createProcessGroupDto(processGroup)); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode); - final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); - final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroupNode.getIdentifier())); - final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroupNode.getIdentifier())); - final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createProcessGroupEntity(snapshot.getComponent(), updatedRevision, permissions, status, bulletinEntities); - } - - @Override - public void verifyUpdateProcessGroup(ProcessGroupDTO processGroupDTO) { - if (processGroupDAO.hasProcessGroup(processGroupDTO.getId())) { - processGroupDAO.verifyUpdate(processGroupDTO); - } - } - - @Override - public ScheduleComponentsEntity enableComponents(String processGroupId, ScheduledState state, Map<String, Revision> componentRevisions) { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - - final RevisionUpdate<ScheduleComponentsEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new - UpdateRevisionTask<ScheduleComponentsEntity>() { - @Override - public RevisionUpdate<ScheduleComponentsEntity> update() { - // schedule the components - processGroupDAO.enableComponents(processGroupId, state, componentRevisions.keySet()); - - // update the revisions - final Map<String, Revision> updatedRevisions = new HashMap<>(); - for (final Revision revision : componentRevisions.values()) { - final Revision currentRevision = revisionManager.getRevision(revision.getComponentId()); - updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId())); - } - - // save - controllerFacade.save(); - - // gather details for response - final ScheduleComponentsEntity entity = new ScheduleComponentsEntity(); - entity.setId(processGroupId); - entity.setState(state.name()); - return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values())); - } - }); - - return updatedComponent.getComponent(); - } - - @Override - public ScheduleComponentsEntity scheduleComponents(final String processGroupId, final ScheduledState state, final Map<String, Revision> componentRevisions) { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final RevisionUpdate<ScheduleComponentsEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new - UpdateRevisionTask<ScheduleComponentsEntity>() { - @Override - public RevisionUpdate<ScheduleComponentsEntity> update() { - // schedule the components - processGroupDAO.scheduleComponents(processGroupId, state, componentRevisions.keySet()); - - // update the revisions - final Map<String, Revision> updatedRevisions = new HashMap<>(); - for (final Revision revision : componentRevisions.values()) { - final Revision currentRevision = revisionManager.getRevision(revision.getComponentId()); - updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId())); - } - - // save - controllerFacade.save(); - - // gather details for response - final ScheduleComponentsEntity entity = new ScheduleComponentsEntity(); - entity.setId(processGroupId); - entity.setState(state.name()); - return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values())); - } - }); - - return updatedComponent.getComponent(); - } - - @Override - public ActivateControllerServicesEntity activateControllerServices(final String processGroupId, final ControllerServiceState state, final Map<String, Revision> serviceRevisions) { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final RevisionUpdate<ActivateControllerServicesEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(serviceRevisions.values()), user, - new UpdateRevisionTask<ActivateControllerServicesEntity>() { - @Override - public RevisionUpdate<ActivateControllerServicesEntity> update() { - // schedule the components - processGroupDAO.activateControllerServices(processGroupId, state, serviceRevisions.keySet()); - - // update the revisions - final Map<String, Revision> updatedRevisions = new HashMap<>(); - for (final Revision revision : serviceRevisions.values()) { - final Revision currentRevision = revisionManager.getRevision(revision.getComponentId()); - updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId())); - } - - // save - controllerFacade.save(); - - // gather details for response - final ActivateControllerServicesEntity entity = new ActivateControllerServicesEntity(); - entity.setId(processGroupId); - entity.setState(state.name()); - return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values())); - } - }); - - return updatedComponent.getComponent(); - } - - - @Override - public ControllerConfigurationEntity updateControllerConfiguration(final Revision revision, final ControllerConfigurationDTO controllerConfigurationDTO) { - final RevisionUpdate<ControllerConfigurationDTO> updatedComponent = updateComponent( - revision, - controllerFacade, - () -> { - if (controllerConfigurationDTO.getMaxTimerDrivenThreadCount() != null) { - controllerFacade.setMaxTimerDrivenThreadCount(controllerConfigurationDTO.getMaxTimerDrivenThreadCount()); - } - if (controllerConfigurationDTO.getMaxEventDrivenThreadCount() != null) { - controllerFacade.setMaxEventDrivenThreadCount(controllerConfigurationDTO.getMaxEventDrivenThreadCount()); - } - - return controllerConfigurationDTO; - }, - controller -> dtoFactory.createControllerConfigurationDto(controllerFacade)); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerFacade); - final RevisionDTO updateRevision = dtoFactory.createRevisionDTO(updatedComponent.getLastModification()); - return entityFactory.createControllerConfigurationEntity(updatedComponent.getComponent(), updateRevision, permissions); - } - - - @Override - public NodeDTO updateNode(final NodeDTO nodeDTO) { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - if (user == null) { - throw new WebApplicationException(new Throwable("Unable to access details for current user.")); - } - final String userDn = user.getIdentity(); - - final NodeIdentifier nodeId = clusterCoordinator.getNodeIdentifier(nodeDTO.getNodeId()); - if (nodeId == null) { - throw new UnknownNodeException("No node exists with ID " + nodeDTO.getNodeId()); - } - - - if (NodeConnectionState.CONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) { - clusterCoordinator.requestNodeConnect(nodeId, userDn); - } else if (NodeConnectionState.OFFLOADING.name().equalsIgnoreCase(nodeDTO.getStatus())) { - clusterCoordinator.requestNodeOffload(nodeId, OffloadCode.OFFLOADED, - "User " + userDn + " requested that node be offloaded"); - } else if (NodeConnectionState.DISCONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) { - clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.USER_DISCONNECTED, - "User " + userDn + " requested that node be disconnected from cluster"); - } - - return getNode(nodeId); - } - - @Override - public CounterDTO updateCounter(final String counterId) { - return dtoFactory.createCounterDto(controllerFacade.resetCounter(counterId)); - } - - @Override - public void verifyCanClearProcessorState(final String processorId) { - processorDAO.verifyClearState(processorId); - } - - @Override - public void clearProcessorState(final String processorId) { - processorDAO.clearState(processorId); - } - - @Override - public void verifyCanClearControllerServiceState(final String controllerServiceId) { - controllerServiceDAO.verifyClearState(controllerServiceId); - } - - @Override - public void clearControllerServiceState(final String controllerServiceId) { - controllerServiceDAO.clearState(controllerServiceId); - } - - @Override - public void verifyCanClearReportingTaskState(final String reportingTaskId) { - reportingTaskDAO.verifyClearState(reportingTaskId); - } - - @Override - public void clearReportingTaskState(final String reportingTaskId) { - reportingTaskDAO.clearState(reportingTaskId); - } - - @Override - public ConnectionEntity deleteConnection(final Revision revision, final String connectionId) { - final Connection connection = connectionDAO.getConnection(connectionId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection); - final ConnectionDTO snapshot = deleteComponent( - revision, - connection.getResource(), - () -> connectionDAO.deleteConnection(connectionId), - false, // no policies to remove - dtoFactory.createConnectionDto(connection)); - - return entityFactory.createConnectionEntity(snapshot, null, permissions, null); - } - - @Override - public DropRequestDTO deleteFlowFileDropRequest(final String connectionId, final String dropRequestId) { - return dtoFactory.createDropRequestDTO(connectionDAO.deleteFlowFileDropRequest(connectionId, dropRequestId)); - } - - @Override - public ListingRequestDTO deleteFlowFileListingRequest(final String connectionId, final String listingRequestId) { - final Connection connection = connectionDAO.getConnection(connectionId); - final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.deleteFlowFileListingRequest(connectionId, listingRequestId)); - - // include whether the source and destination are running - if (connection.getSource() != null) { - listRequest.setSourceRunning(connection.getSource().isRunning()); - } - if (connection.getDestination() != null) { - listRequest.setDestinationRunning(connection.getDestination().isRunning()); - } - - return listRequest; - } - - @Override - public ProcessorEntity deleteProcessor(final Revision revision, final String processorId) { - final ProcessorNode processor = processorDAO.getProcessor(processorId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor)); - final ProcessorDTO snapshot = deleteComponent( - revision, - processor.getResource(), - () -> processorDAO.deleteProcessor(processorId), - true, - dtoFactory.createProcessorDto(processor)); - - return entityFactory.createProcessorEntity(snapshot, null, permissions, operatePermissions, null, null); - } - - @Override - public ProcessorEntity terminateProcessor(final String processorId) { - processorDAO.terminate(processorId); - return getProcessor(processorId); - } - - @Override - public void verifyTerminateProcessor(final String processorId) { - processorDAO.verifyTerminate(processorId); - } - - @Override - public LabelEntity deleteLabel(final Revision revision, final String labelId) { - final Label label = labelDAO.getLabel(labelId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label); - final LabelDTO snapshot = deleteComponent( - revision, - label.getResource(), - () -> labelDAO.deleteLabel(labelId), - true, - dtoFactory.createLabelDto(label)); - - return entityFactory.createLabelEntity(snapshot, null, permissions); - } - - @Override - public UserEntity deleteUser(final Revision revision, final String userId) { - final User user = userDAO.getUser(userId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); - final Set<TenantEntity> userGroups = user != null ? userGroupDAO.getUserGroupsForUser(userId).stream() - .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()) : null; - final Set<AccessPolicySummaryEntity> policyEntities = user != null ? userGroupDAO.getAccessPoliciesForUser(userId).stream() - .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()) : null; - - final String resourceIdentifier = ResourceFactory.getTenantResource().getIdentifier() + "/" + userId; - final UserDTO snapshot = deleteComponent( - revision, - new Resource() { - @Override - public String getIdentifier() { - return resourceIdentifier; - } - - @Override - public String getName() { - return resourceIdentifier; - } - - @Override - public String getSafeDescription() { - return "User " + userId; - } - }, - () -> userDAO.deleteUser(userId), - false, // no user specific policies to remove - dtoFactory.createUserDto(user, userGroups, policyEntities)); - - return entityFactory.createUserEntity(snapshot, null, permissions); - } - - @Override - public UserGroupEntity deleteUserGroup(final Revision revision, final String userGroupId) { - final Group userGroup = userGroupDAO.getUserGroup(userGroupId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); - final Set<TenantEntity> users = userGroup != null ? userGroup.getUsers().stream() - .map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()) : null; - final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(userGroup.getIdentifier()).stream() - .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); - - final String resourceIdentifier = ResourceFactory.getTenantResource().getIdentifier() + "/" + userGroupId; - final UserGroupDTO snapshot = deleteComponent( - revision, - new Resource() { - @Override - public String getIdentifier() { - return resourceIdentifier; - } - - @Override - public String getName() { - return resourceIdentifier; - } - - @Override - public String getSafeDescription() { - return "User Group " + userGroupId; - } - }, - () -> userGroupDAO.deleteUserGroup(userGroupId), - false, // no user group specific policies to remove - dtoFactory.createUserGroupDto(userGroup, users, policyEntities)); - - return entityFactory.createUserGroupEntity(snapshot, null, permissions); - } - - @Override - public AccessPolicyEntity deleteAccessPolicy(final Revision revision, final String accessPolicyId) { - final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(accessPolicyId); - final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource()); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicyId)); - final Set<TenantEntity> userGroups = accessPolicy != null ? accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()) : null; - final Set<TenantEntity> users = accessPolicy != null ? accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()) : null; - final AccessPolicyDTO snapshot = deleteComponent( - revision, - new Resource() { - @Override - public String getIdentifier() { - return accessPolicy.getResource(); - } - - @Override - public String getName() { - return accessPolicy.getResource(); - } - - @Override - public String getSafeDescription() { - return "Policy " + accessPolicyId; - } - }, - () -> accessPolicyDAO.deleteAccessPolicy(accessPolicyId), - false, // no need to clean up any policies as it's already been removed above - dtoFactory.createAccessPolicyDto(accessPolicy, userGroups, users, componentReference)); - - return entityFactory.createAccessPolicyEntity(snapshot, null, permissions); - } - - @Override - public FunnelEntity deleteFunnel(final Revision revision, final String funnelId) { - final Funnel funnel = funnelDAO.getFunnel(funnelId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnel); - final FunnelDTO snapshot = deleteComponent( - revision, - funnel.getResource(), - () -> funnelDAO.deleteFunnel(funnelId), - true, - dtoFactory.createFunnelDto(funnel)); - - return entityFactory.createFunnelEntity(snapshot, null, permissions); - } - - /** - * Deletes a component using the Optimistic Locking Manager - * - * @param revision the current revision - * @param resource the resource being removed - * @param deleteAction the action that deletes the component via the appropriate DAO object - * @param cleanUpPolicies whether or not the policies for this resource should be removed as well - not necessary when there are - * no component specific policies or if the policies of the component are inherited - * @return a dto that represents the new configuration - */ - private <D, C> D deleteComponent(final Revision revision, final Resource resource, final Runnable deleteAction, final boolean cleanUpPolicies, final D dto) { - final RevisionClaim claim = new StandardRevisionClaim(revision); - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - - return revisionManager.deleteRevision(claim, user, new DeleteRevisionTask<D>() { - @Override - public D performTask() { - logger.debug("Attempting to delete component {} with claim {}", resource.getIdentifier(), claim); - - // run the delete action - deleteAction.run(); - - // save the flow - controllerFacade.save(); - logger.debug("Deletion of component {} was successful", resource.getIdentifier()); - - if (cleanUpPolicies) { - cleanUpPolicies(resource); - } - - return dto; - } - }); - } - - /** - * Clean up the policies for the specified component resource. - * - * @param componentResource the resource for the component - */ - private void cleanUpPolicies(final Resource componentResource) { - // ensure the authorizer supports configuration - if (accessPolicyDAO.supportsConfigurableAuthorizer()) { - final List<Resource> resources = new ArrayList<>(); - resources.add(componentResource); - resources.add(ResourceFactory.getDataResource(componentResource)); - resources.add(ResourceFactory.getProvenanceDataResource(componentResource)); - resources.add(ResourceFactory.getDataTransferResource(componentResource)); - resources.add(ResourceFactory.getPolicyResource(componentResource)); - - for (final Resource resource : resources) { - for (final RequestAction action : RequestAction.values()) { - try { - // since the component is being deleted, also delete any relevant access policies - final AccessPolicy readPolicy = accessPolicyDAO.getAccessPolicy(action, resource.getIdentifier()); - if (readPolicy != null) { - accessPolicyDAO.deleteAccessPolicy(readPolicy.getIdentifier()); - } - } catch (final Exception e) { - logger.warn(String.format("Unable to remove access policy for %s %s after component removal.", action, resource.getIdentifier()), e); - } - } - } - } - } - - @Override - public void verifyDeleteSnippet(final String snippetId, final Set<String> affectedComponentIds) { - snippetDAO.verifyDeleteSnippetComponents(snippetId); - } - - @Override - public SnippetEntity deleteSnippet(final Set<Revision> revisions, final String snippetId) { - final Snippet snippet = snippetDAO.getSnippet(snippetId); - - // grab the resources in the snippet so we can delete the policies afterwards - final Set<Resource> snippetResources = new HashSet<>(); - snippet.getProcessors().keySet().forEach(id -> snippetResources.add(processorDAO.getProcessor(id).getResource())); - snippet.getInputPorts().keySet().forEach(id -> snippetResources.add(inputPortDAO.getPort(id).getResource())); - snippet.getOutputPorts().keySet().forEach(id -> snippetResources.add(outputPortDAO.getPort(id).getResource())); - snippet.getFunnels().keySet().forEach(id -> snippetResources.add(funnelDAO.getFunnel(id).getResource())); - snippet.getLabels().keySet().forEach(id -> snippetResources.add(labelDAO.getLabel(id).getResource())); - snippet.getRemoteProcessGroups().keySet().forEach(id -> snippetResources.add(remoteProcessGroupDAO.getRemoteProcessGroup(id).getResource())); - snippet.getProcessGroups().keySet().forEach(id -> { - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(id); - - // add the process group - snippetResources.add(processGroup.getResource()); - - // add each encapsulated component - processGroup.findAllProcessors().forEach(processor -> snippetResources.add(processor.getResource())); - processGroup.findAllInputPorts().forEach(inputPort -> snippetResources.add(inputPort.getResource())); - processGroup.findAllOutputPorts().forEach(outputPort -> snippetResources.add(outputPort.getResource())); - processGroup.findAllFunnels().forEach(funnel -> snippetResources.add(funnel.getResource())); - processGroup.findAllLabels().forEach(label -> snippetResources.add(label.getResource())); - processGroup.findAllProcessGroups().forEach(childGroup -> snippetResources.add(childGroup.getResource())); - processGroup.findAllRemoteProcessGroups().forEach(remoteProcessGroup -> snippetResources.add(remoteProcessGroup.getResource())); - processGroup.findAllTemplates().forEach(template -> snippetResources.add(template.getResource())); - processGroup.findAllControllerServices().forEach(controllerService -> snippetResources.add(controllerService.getResource())); - }); - - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final RevisionClaim claim = new StandardRevisionClaim(revisions); - final SnippetDTO dto = revisionManager.deleteRevision(claim, user, new DeleteRevisionTask<SnippetDTO>() { - @Override - public SnippetDTO performTask() { - // delete the components in the snippet - snippetDAO.deleteSnippetComponents(snippetId); - - // drop the snippet - snippetDAO.dropSnippet(snippetId); - - // save - controllerFacade.save(); - - // create the dto for the snippet that was just removed - return dtoFactory.createSnippetDto(snippet); - } - }); - - // clean up component policies - snippetResources.forEach(resource -> cleanUpPolicies(resource)); - - return entityFactory.createSnippetEntity(dto); - } - - @Override - public PortEntity deleteInputPort(final Revision revision, final String inputPortId) { - final Port port = inputPortDAO.getPort(inputPortId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port)); - final PortDTO snapshot = deleteComponent( - revision, - port.getResource(), - () -> inputPortDAO.deletePort(inputPortId), - true, - dtoFactory.createPortDto(port)); - - return entityFactory.createPortEntity(snapshot, null, permissions, operatePermissions, null, null); - } - - @Override - public PortEntity deleteOutputPort(final Revision revision, final String outputPortId) { - final Port port = outputPortDAO.getPort(outputPortId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port)); - final PortDTO snapshot = deleteComponent( - revision, - port.getResource(), - () -> outputPortDAO.deletePort(outputPortId), - true, - dtoFactory.createPortDto(port)); - - return entityFactory.createPortEntity(snapshot, null, permissions, operatePermissions, null, null); - } - - @Override - public ProcessGroupEntity deleteProcessGroup(final Revision revision, final String groupId) { - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); - - // grab the resources in the snippet so we can delete the policies afterwards - final Set<Resource> groupResources = new HashSet<>(); - processGroup.findAllProcessors().forEach(processor -> groupResources.add(processor.getResource())); - processGroup.findAllInputPorts().forEach(inputPort -> groupResources.add(inputPort.getResource())); - processGroup.findAllOutputPorts().forEach(outputPort -> groupResources.add(outputPort.getResource())); - processGroup.findAllFunnels().forEach(funnel -> groupResources.add(funnel.getResource())); - processGroup.findAllLabels().forEach(label -> groupResources.add(label.getResource())); - processGroup.findAllProcessGroups().forEach(childGroup -> groupResources.add(childGroup.getResource())); - processGroup.findAllRemoteProcessGroups().forEach(remoteProcessGroup -> groupResources.add(remoteProcessGroup.getResource())); - processGroup.findAllTemplates().forEach(template -> groupResources.add(template.getResource())); - processGroup.findAllControllerServices().forEach(controllerService -> groupResources.add(controllerService.getResource())); - - final ProcessGroupDTO snapshot = deleteComponent( - revision, - processGroup.getResource(), - () -> processGroupDAO.deleteProcessGroup(groupId), - true, - dtoFactory.createProcessGroupDto(processGroup)); - - // delete all applicable component policies - groupResources.forEach(groupResource -> cleanUpPolicies(groupResource)); - - return entityFactory.createProcessGroupEntity(snapshot, null, permissions, null, null); - } - - @Override - public RemoteProcessGroupEntity deleteRemoteProcessGroup(final Revision revision, final String remoteProcessGroupId) { - final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroup)); - final RemoteProcessGroupDTO snapshot = deleteComponent( - revision, - remoteProcessGroup.getResource(), - () -> remoteProcessGroupDAO.deleteRemoteProcessGroup(remoteProcessGroupId), - true, - dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup)); - - return entityFactory.createRemoteProcessGroupEntity(snapshot, null, permissions, operatePermissions, null, null); - } - - @Override - public void deleteTemplate(final String id) { - // delete the template and save the flow - templateDAO.deleteTemplate(id); - controllerFacade.save(); - } - - @Override - public ConnectionEntity createConnection(final Revision revision, final String groupId, final ConnectionDTO connectionDTO) { - final RevisionUpdate<ConnectionDTO> snapshot = createComponent( - revision, - connectionDTO, - () -> connectionDAO.createConnection(groupId, connectionDTO), - connection -> dtoFactory.createConnectionDto(connection)); - - final Connection connection = connectionDAO.getConnection(connectionDTO.getId()); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection); - final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionDTO.getId())); - return entityFactory.createConnectionEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status); - } - - @Override - public DropRequestDTO createFlowFileDropRequest(final String connectionId, final String dropRequestId) { - return dtoFactory.createDropRequestDTO(connectionDAO.createFlowFileDropRequest(connectionId, dropRequestId)); - } - - @Override - public ListingRequestDTO createFlowFileListingRequest(final String connectionId, final String listingRequestId) { - final Connection connection = connectionDAO.getConnection(connectionId); - final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.createFlowFileListingRequest(connectionId, listingRequestId)); - - // include whether the source and destination are running - if (connection.getSource() != null) { - listRequest.setSourceRunning(connection.getSource().isRunning()); - } - if (connection.getDestination() != null) { - listRequest.setDestinationRunning(connection.getDestination().isRunning()); - } - - return listRequest; - } - - @Override - public ProcessorEntity createProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) { - final RevisionUpdate<ProcessorDTO> snapshot = createComponent( - revision, - processorDTO, - () -> processorDAO.createProcessor(groupId, processorDTO), - processor -> { - awaitValidationCompletion(processor); - return dtoFactory.createProcessorDto(processor); - }); - - final ProcessorNode processor = processorDAO.getProcessor(processorDTO.getId()); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor)); - final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processorDTO.getId())); - final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processorDTO.getId())); - final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); - } - - @Override - public LabelEntity createLabel(final Revision revision, final String groupId, final LabelDTO labelDTO) { - final RevisionUpdate<LabelDTO> snapshot = createComponent( - revision, - labelDTO, - () -> labelDAO.createLabel(groupId, labelDTO), - label -> dtoFactory.createLabelDto(label)); - - final Label label = labelDAO.getLabel(labelDTO.getId()); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label); - return entityFactory.createLabelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); - } - - /** - * Creates a component using the optimistic locking manager. - * - * @param componentDto the DTO that will be used to create the component - * @param daoCreation A Supplier that will create the NiFi Component to use - * @param dtoCreation a Function that will convert the NiFi Component into a corresponding DTO - * @param <D> the DTO Type - * @param <C> the NiFi Component Type - * @return a RevisionUpdate that represents the updated configuration - */ - private <D, C> RevisionUpdate<D> createComponent(final Revision revision, final ComponentDTO componentDto, final Supplier<C> daoCreation, final Function<C, D> dtoCreation) { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - - // read lock on the containing group - // request claim for component to be created... revision already verified (version == 0) - final RevisionClaim claim = new StandardRevisionClaim(revision); - - // update revision through revision manager - return revisionManager.updateRevision(claim, user, () -> { - // add the component - final C component = daoCreation.get(); - - // save the flow - controllerFacade.save(); - - final D dto = dtoCreation.apply(component); - final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); - return new StandardRevisionUpdate<>(dto, lastMod); - }); - } - - @Override - public BulletinEntity createBulletin(final BulletinDTO bulletinDTO, final Boolean canRead){ - final Bulletin bulletin = BulletinFactory.createBulletin(bulletinDTO.getCategory(),bulletinDTO.getLevel(),bulletinDTO.getMessage()); - bulletinRepository.addBulletin(bulletin); - return entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin),canRead); - } - - @Override - public FunnelEntity createFunnel(final Revision revision, final String groupId, final FunnelDTO funnelDTO) { - final RevisionUpdate<FunnelDTO> snapshot = createComponent( - revision, - funnelDTO, - () -> funnelDAO.createFunnel(groupId, funnelDTO), - funnel -> dtoFactory.createFunnelDto(funnel)); - - final Funnel funnel = funnelDAO.getFunnel(funnelDTO.getId()); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnel); - return entityFactory.createFunnelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); - } - - @Override - public AccessPolicyEntity createAccessPolicy(final Revision revision, final AccessPolicyDTO accessPolicyDTO) { - final Authorizable tenantAuthorizable = authorizableLookup.getTenant(); - final String creator = NiFiUserUtils.getNiFiUserIdentity(); - - final AccessPolicy newAccessPolicy = accessPolicyDAO.createAccessPolicy(accessPolicyDTO); - final ComponentReferenceEntity componentReference = createComponentReferenceEntity(newAccessPolicy.getResource()); - final AccessPolicyDTO newAccessPolicyDto = dtoFactory.createAccessPolicyDto(newAccessPolicy, - newAccessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()), - newAccessPolicy.getUsers().stream().map(userId -> { - final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userId)); - return entityFactory.createTenantEntity(dtoFactory.createTenantDTO(userDAO.getUser(userId)), userRevision, - dtoFactory.createPermissionsDto(tenantAuthorizable)); - }).collect(Collectors.toSet()), componentReference); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicyDTO.getId())); - return entityFactory.createAccessPolicyEntity(newAccessPolicyDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions); - } - - @Override - public UserEntity createUser(final Revision revision, final UserDTO userDTO) { - final String creator = NiFiUserUtils.getNiFiUserIdentity(); - final User newUser = userDAO.createUser(userDTO); - final Set<TenantEntity> tenantEntities = userGroupDAO.getUserGroupsForUser(newUser.getIdentifier()).stream() - .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()); - final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUser(newUser.getIdentifier()).stream() - .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); - final UserDTO newUserDto = dtoFactory.createUserDto(newUser, tenantEntities, policyEntities); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); - return entityFactory.createUserEntity(newUserDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions); - } - - private ComponentReferenceEntity createComponentReferenceEntity(final String resource) { - ComponentReferenceEntity componentReferenceEntity = null; - try { - // get the component authorizable - Authorizable componentAuthorizable = authorizableLookup.getAuthorizableFromResource(resource); - - // if this represents an authorizable whose policy permissions are enforced through the base resource, - // get the underlying base authorizable for the component reference - if (componentAuthorizable instanceof EnforcePolicyPermissionsThroughBaseResource) { - componentAuthorizable = ((EnforcePolicyPermissionsThroughBaseResource) componentAuthorizable).getBaseAuthorizable(); - } - - final ComponentReferenceDTO componentReference = dtoFactory.createComponentReferenceDto(componentAuthorizable); - if (componentReference != null) { - final PermissionsDTO componentReferencePermissions = dtoFactory.createPermissionsDto(componentAuthorizable); - final RevisionDTO componentReferenceRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(componentReference.getId())); - componentReferenceEntity = entityFactory.createComponentReferenceEntity(componentReference, componentReferenceRevision, componentReferencePermissions); - } - } catch (final ResourceNotFoundException e) { - // component not found for the specified resource - } - - return componentReferenceEntity; - } - - private AccessPolicySummaryEntity createAccessPolicySummaryEntity(final AccessPolicy ap) { - final ComponentReferenceEntity componentReference = createComponentReferenceEntity(ap.getResource()); - final AccessPolicySummaryDTO apSummary = dtoFactory.createAccessPolicySummaryDto(ap, componentReference); - final PermissionsDTO apPermissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(ap.getIdentifier())); - final RevisionDTO apRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(ap.getIdentifier())); - return entityFactory.createAccessPolicySummaryEntity(apSummary, apRevision, apPermissions); - } - - @Override - public UserGroupEntity createUserGroup(final Revision revision, final UserGroupDTO userGroupDTO) { - final String creator = NiFiUserUtils.getNiFiUserIdentity(); - final Group newUserGroup = userGroupDAO.createUserGroup(userGroupDTO); - final Set<TenantEntity> tenantEntities = newUserGroup.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()); - final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(newUserGroup.getIdentifier()).stream() - .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); - final UserGroupDTO newUserGroupDto = dtoFactory.createUserGroupDto(newUserGroup, tenantEntities, policyEntities); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); - return entityFactory.createUserGroupEntity(newUserGroupDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions); - } - - private void validateSnippetContents(final FlowSnippetDTO flow) { - // validate any processors - if (flow.getProcessors() != null) { - for (final ProcessorDTO processorDTO : flow.getProcessors()) { - final ProcessorNode processorNode = processorDAO.getProcessor(processorDTO.getId()); - processorDTO.setValidationStatus(processorNode.getValidationStatus().name()); - - final Collection<ValidationResult> validationErrors = processorNode.getValidationErrors(); - if (validationErrors != null && !validationErrors.isEmpty()) { - final List<String> errors = new ArrayList<>(); - for (final ValidationResult validationResult : validationErrors) { - errors.add(validationResult.toString()); - } - processorDTO.setValidationErrors(errors); - } - } - } - - if (flow.getInputPorts() != null) { - for (final PortDTO portDTO : flow.getInputPorts()) { - final Port port = inputPortDAO.getPort(portDTO.getId()); - final Collection<ValidationResult> validationErrors = port.getValidationErrors(); - if (validationErrors != null && !validationErrors.isEmpty()) { - final List<String> errors = new ArrayList<>(); - for (final ValidationResult validationResult : validationErrors) { - errors.add(validationResult.toString()); - } - portDTO.setValidationErrors(errors); - } - } - } - - if (flow.getOutputPorts() != null) { - for (final PortDTO portDTO : flow.getOutputPorts()) { - final Port port = outputPortDAO.getPort(portDTO.getId()); - final Collection<ValidationResult> validationErrors = port.getValidationErrors(); - if (validationErrors != null && !validationErrors.isEmpty()) { - final List<String> errors = new ArrayList<>(); - for (final ValidationResult validationResult : validationErrors) { - errors.add(validationResult.toString()); - } - portDTO.setValidationErrors(errors); - } - } - } - - // get any remote process group issues - if (flow.getRemoteProcessGroups() != null) { - for (final RemoteProcessGroupDTO remoteProcessGroupDTO : flow.getRemoteProcessGroups()) { - final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId()); - - if (remoteProcessGroup.getAuthorizationIssue() != null) { - remoteProcessGroupDTO.setAuthorizationIssues(Arrays.asList(remoteProcessGroup.getAuthorizationIssue())); - } - } - } - } - - @Override - public FlowEntity copySnippet(final String groupId, final String snippetId, final Double originX, final Double originY, final String idGenerationSeed) { - // create the new snippet - final FlowSnippetDTO snippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY, idGenerationSeed); - - // save the flow - controllerFacade.save(); - - // drop the snippet - snippetDAO.dropSnippet(snippetId); - - // post process new flow snippet - final FlowDTO flowDto = postProcessNewFlowSnippet(groupId, snippet); - - final FlowEntity flowEntity = new FlowEntity(); - flowEntity.setFlow(flowDto); - return flowEntity; - } - - @Override - public SnippetEntity createSnippet(final SnippetDTO snippetDTO) { - // add the component - final Snippet snippet = snippetDAO.createSnippet(snippetDTO); - - // save the flow - controllerFacade.save(); - - final SnippetDTO dto = dtoFactory.createSnippetDto(snippet); - final RevisionUpdate<SnippetDTO> snapshot = new StandardRevisionUpdate<>(dto, null); - - return entityFactory.createSnippetEntity(snapshot.getComponent()); - } - - @Override - public PortEntity createInputPort(final Revision revision, final String groupId, final PortDTO inputPortDTO) { - final RevisionUpdate<PortDTO> snapshot = createComponent( - revision, - inputPortDTO, - () -> inputPortDAO.createPort(groupId, inputPortDTO), - port -> dtoFactory.createPortDto(port)); - - final Port port = inputPortDAO.getPort(inputPortDTO.getId()); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port)); - final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(port.getIdentifier())); - final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier())); - final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); - } - - @Override - public PortEntity createOutputPort(final Revision revision, final String groupId, final PortDTO outputPortDTO) { - final RevisionUpdate<PortDTO> snapshot = createComponent( - revision, - outputPortDTO, - () -> outputPortDAO.createPort(groupId, outputPortDTO), - port -> dtoFactory.createPortDto(port)); - - final Port port = outputPortDAO.getPort(outputPortDTO.getId()); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port)); - final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(port.getIdentifier())); - final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier())); - final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); - } - - @Override - public ProcessGroupEntity createProcessGroup(final Revision revision, final String parentGroupId, final ProcessGroupDTO processGroupDTO) { - final RevisionUpdate<ProcessGroupDTO> snapshot = createComponent( - revision, - processGroupDTO, - () -> processGroupDAO.createProcessGroup(parentGroupId, processGroupDTO), - processGroup -> dtoFactory.createProcessGroupDto(processGroup)); - - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupDTO.getId()); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); - final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroup.getIdentifier())); - final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroup.getIdentifier())); - final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status, bulletinEntities); - } - - @Override - public RemoteProcessGroupEntity createRemoteProcessGroup(final Revision revision, final String groupId, final RemoteProcessGroupDTO remoteProcessGroupDTO) { - final RevisionUpdate<RemoteProcessGroupDTO> snapshot = createComponent( - revision, - remoteProcessGroupDTO, - () -> remoteProcessGroupDAO.createRemoteProcessGroup(groupId, remoteProcessGroupDTO), - remoteProcessGroup -> dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup)); - - final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId()); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroup)); - final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroup, controllerFacade.getRemoteProcessGroupStatus(remoteProcessGroup.getIdentifier())); - final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(remoteProcessGroup.getIdentifier())); - final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), - permissions, operatePermissions, status, bulletinEntities); - } - - @Override - public boolean isRemoteGroupPortConnected(final String remoteProcessGroupId, final String remotePortId) { - final RemoteProcessGroup rpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId); - RemoteGroupPort port = rpg.getInputPort(remotePortId); - if (port != null) { - return port.hasIncomingConnection(); - } - - port = rpg.getOutputPort(remotePortId); - if (port != null) { - return !port.getConnections().isEmpty(); - } - - throw new ResourceNotFoundException("Could not find Port with ID " + remotePortId + " as a child of RemoteProcessGroup with ID " + remoteProcessGroupId); - } - - @Override - public void verifyCanAddTemplate(String groupId, String name) { - templateDAO.verifyCanAddTemplate(name, groupId); - } - - @Override - public void verifyComponentTypes(FlowSnippetDTO snippet) { - templateDAO.verifyComponentTypes(snippet); - } - - @Override - public void verifyComponentTypes(final VersionedProcessGroup versionedGroup) { - controllerFacade.verifyComponentTypes(versionedGroup); - } - - @Override - public void verifyImportProcessGroup(final VersionControlInformationDTO versionControlInfo, final VersionedProcessGroup contents, final String groupId) { - final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); - verifyImportProcessGroup(versionControlInfo, contents, group); - } - - private void verifyImportProcessGroup(final VersionControlInformationDTO vciDto, final VersionedProcessGroup contents, final ProcessGroup group) { - if (group == null) { - return; - } - - final VersionControlInformation vci = group.getVersionControlInformation(); - if (vci != null) { - // Note that we do not compare the Registry ID here because there could be two registry clients - // that point to the same server (one could point to localhost while another points to 127.0.0.1, for instance).. - if (Objects.equals(vciDto.getBucketId(), vci.getBucketIdentifier()) - && Objects.equals(vciDto.getFlowId(), vci.getFlowIdentifier())) { - - throw new IllegalStateException("Cannot import the specified Versioned Flow into the Process Group because doing so would cause a recursive dataflow. " - + "If Process Group A contains Process Group B, then Process Group B is not allowed to contain the flow identified by Process Group A."); - } - } - - final Set<VersionedProcessGroup> childGroups = contents.getProcessGroups(); - if (childGroups != null) { - for (final VersionedProcessGroup childGroup : childGroups) { - final VersionedFlowCoordinates childCoordinates = childGroup.getVersionedFlowCoordinates(); - if (childCoordinates != null) { - final VersionControlInformationDTO childVci = new VersionControlInformationDTO(); - childVci.setBucketId(childCoordinates.getBucketId()); - childVci.setFlowId(childCoordinates.getFlowId()); - verifyImportProcessGroup(childVci, childGroup, group); - } - } - } - - verifyImportProcessGroup(vciDto, contents, group.getParent()); - } - - @Override - public TemplateDTO createTemplate(final String name, final String description, final String snippetId, final String groupId, final Optional<String> idGenerationSeed) { - // get the specified snippet - final Snippet snippet = snippetDAO.getSnippet(snippetId); - - // create the template - final TemplateDTO templateDTO = new TemplateDTO(); - templateDTO.setName(name); - templateDTO.setDescription(description); - templateDTO.setTimestamp(new Date()); - templateDTO.setSnippet(snippetUtils.populateFlowSnippet(snippet, true, true, true)); - templateDTO.setEncodingVersion(TemplateDTO.MAX_ENCODING_VERSION); - - // set the id based on the specified seed - final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString(); - templateDTO.setId(uuid); - - // create the template - final Template template = templateDAO.createTemplate(templateDTO, groupId); - - // drop the snippet - snippetDAO.dropSnippet(snippetId); - - // save the flow - controllerFacade.save(); - - return dtoFactory.createTemplateDTO(template); - } - - /** - * Ensures default values are populated for all components in this snippet. This is necessary to handle old templates without default values - * and when existing properties have default values introduced. - * - * @param snippet snippet - */ - private void ensureDefaultPropertyValuesArePopulated(final FlowSnippetDTO snippet) { - if (snippet != null) { - if (snippet.getControllerServices() != null) { - snippet.getControllerServices().forEach(dto -> { - if (dto.getProperties() == null) { - dto.setProperties(new LinkedHashMap<>()); - } - - try { - final ConfigurableComponent configurableComponent = controllerFacade.getTemporaryComponent(dto.getType(), dto.getBundle()); - configurableComponent.getPropertyDescriptors().forEach(descriptor -> { - if (dto.getProperties().get(descriptor.getName()) == null) { - dto.getProperties().put(descriptor.getName(), descriptor.getDefaultValue()); - } - }); - } catch (final Exception e) { - logger.warn(String.format("Unable to create ControllerService of type %s to populate default values.", dto.getType())); - } - }); - } - - if (snippet.getProcessors() != null) { - snippet.getProcessors().forEach(dto -> { - if (dto.getConfig() == null) { - dto.setConfig(new ProcessorConfigDTO()); - } - - final ProcessorConfigDTO config = dto.getConfig(); - if (config.getProperties() == null) { - config.setProperties(new LinkedHashMap<>()); - } - - try { - final ConfigurableComponent configurableComponent = controllerFacade.getTemporaryComponent(dto.getType(), dto.getBundle()); - configurableComponent.getPropertyDescriptors().forEach(descriptor -> { - if (config.getProperties().get(descriptor.getName()) == null) { - config.getProperties().put(descriptor.getName(), descriptor.getDefaultValue()); - } - }); - } catch (final Exception e) { - logger.warn(String.format("Unable to create Processor of type %s to populate default values.", dto.getType())); - } - }); - } - - if (snippet.getProcessGroups() != null) { - snippet.getProcessGroups().forEach(processGroup -> { - ensureDefaultPropertyValuesArePopulated(processGroup.getContents()); - }); - } - } - } - - @Override - public TemplateDTO importTemplate(final TemplateDTO templateDTO, final String groupId, final Optional<String> idGenerationSeed) { - // ensure id is set - final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString(); - templateDTO.setId(uuid); - - // mark the timestamp - templateDTO.setTimestamp(new Date()); - - // ensure default values are populated - ensureDefaultPropertyValuesArePopulated(templateDTO.getSnippet()); - - // import the template - final Template template = templateDAO.importTemplate(templateDTO, groupId); - - // save the flow - controllerFacade.save(); - - // return the template dto - return dtoFactory.createTemplateDTO(template); - } - - /** - * Post processes a new flow snippet including validation, removing the snippet, and DTO conversion. - * - * @param groupId group id - * @param snippet snippet - * @return flow dto - */ - private FlowDTO postProcessNewFlowSnippet(final String groupId, final FlowSnippetDTO snippet) { - // validate the new snippet - validateSnippetContents(snippet); - - // identify all components added - final Set<String> identifiers = new HashSet<>(); - snippet.getProcessors().stream() - .map(proc -> proc.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getConnections().stream() - .map(conn -> conn.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getInputPorts().stream() - .map(port -> port.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getOutputPorts().stream() - .map(port -> port.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getProcessGroups().stream() - .map(group -> group.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getRemoteProcessGroups().stream() - .map(remoteGroup -> remoteGroup.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getRemoteProcessGroups().stream() - .filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getInputPorts() != null) - .flatMap(remoteGroup -> remoteGroup.getContents().getInputPorts().stream()) - .map(remoteInputPort -> remoteInputPort.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getRemoteProcessGroups().stream() - .filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getOutputPorts() != null) - .flatMap(remoteGroup -> remoteGroup.getContents().getOutputPorts().stream()) - .map(remoteOutputPort -> remoteOutputPort.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getLabels().stream() - .map(label -> label.getId()) - .forEach(id -> identifiers.add(id)); - - final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); - final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId); - return dtoFactory.createFlowDto(group, groupStatus, snippet, revisionManager, this::getProcessGroupBulletins); - } - - @Override - public FlowEntity createTemplateInstance(final String groupId, final Double originX, final Double originY, final String templateEncodingVersion, - final FlowSnippetDTO requestSnippet, final String idGenerationSeed) { - - // instantiate the template - there is no need to make another copy of the flow snippet since the actual template - // was copied and this dto is only used to instantiate it's components (which as already completed) - final FlowSnippetDTO snippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateEncodingVersion, requestSnippet, idGenerationSeed); - - // save the flow - controllerFacade.save(); - - // post process the new flow snippet - final FlowDTO flowDto = postProcessNewFlowSnippet(groupId, snippet); - - final FlowEntity flowEntity = new FlowEntity(); - flowEntity.setFlow(flowDto); - return flowEntity; - } - - @Override - public ControllerServiceEntity createControllerService(final Revision revision, final String groupId, final ControllerServiceDTO controllerServiceDTO) { - controllerServiceDTO.setParentGroupId(groupId); - - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - - // request claim for component to be created... revision already verified (version == 0) - final RevisionClaim claim = new StandardRevisionClaim(revision); - - final RevisionUpdate<ControllerServiceDTO> snapshot; - if (groupId == null) { - // update revision through revision manager - snapshot = revisionManager.updateRevision(claim, user, () -> { - // Unfortunately, we can not use the createComponent() method here because createComponent() wants to obtain the read lock - // on the group. The Controller Service may or may not have a Process Group (it won't if it's controller-scoped). - final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO); - controllerFacade.save(); - - awaitValidationCompletion(controllerService); - final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(controllerService); - - final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); - return new StandardRevisionUpdate<>(dto, lastMod); - }); - } else { - snapshot = revisionManager.updateRevision(claim, user, () -> { - final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO); - controllerFacade.save(); - - awaitValidationCompletion(controllerService); - final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(controllerService); - - final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); - return new StandardRevisionUpdate<>(dto, lastMod); - }); - } - - final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceDTO.getId()); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService)); - final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(controllerServiceDTO.getId())); - final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities); - } - - @Override - public ControllerServiceEntity updateControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) { - // get the component, ensure we have access to it, and perform the update request - final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceDTO.getId()); - final RevisionUpdate<ControllerServiceDTO> snapshot = updateComponent(revision, - controllerService, - () -> controllerServiceDAO.updateControllerService(controllerServiceDTO), - cs -> { - awaitValidationCompletion(cs); - final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(cs); - final ControllerServiceReference ref = controllerService.getReferences(); - final ControllerServiceReferencingComponentsEntity referencingComponentsEntity = - createControllerServiceReferencingComponentsEntity(ref, Sets.newHashSet(controllerService.getIdentifier())); - dto.setReferencingComponents(referencingComponentsEntity.getControllerServiceReferencingComponents()); - return dto; - }); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService)); - final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(controllerServiceDTO.getId())); - final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities); - } - - - @Override - public ControllerServiceReferencingComponentsEntity updateControllerServiceReferencingComponents( - final Map<String, Revision> referenceRevisions, final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) { - - final RevisionClaim claim = new StandardRevisionClaim(referenceRevisions.values()); - - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final RevisionUpdate<ControllerServiceReferencingComponentsEntity> update = revisionManager.updateRevision(claim, user, - new UpdateRevisionTask<ControllerServiceReferencingComponentsEntity>() { - @Override - public RevisionUpdate<ControllerServiceReferencingComponentsEntity> update() { - final Set<ComponentNode> updated = controllerServiceDAO.updateControllerServiceReferencingComponents(controllerServiceId, scheduledState, controllerServiceState); - final ControllerServiceReference updatedReference = controllerServiceDAO.getControllerService(controllerServiceId).getReferences(); - - // get the revisions of the updated components - final Map<String, Revision> updatedRevisions = new HashMap<>(); - for (final ComponentNode component : updated) { - final Revision currentRevision = revisionManager.getRevision(component.getIdentifier()); - final Revision requestRevision = referenceRevisions.get(component.getIdentifier()); - updatedRevisions.put(component.getIdentifier(), currentRevision.incrementRevision(requestRevision.getClientId())); - } - - // ensure the revision for all referencing components is included regardless of whether they were updated in this request - for (final ComponentNode component : updatedReference.findRecursiveReferences(ComponentNode.class)) { - updatedRevisions.putIfAbsent(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier())); - } - - final ControllerServiceReferencingComponentsEntity entity = createControllerServiceReferencingComponentsEntity(updatedReference, updatedRevisions); - return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values())); - } - }); - - return update.getComponent(); - } - - /** - * Finds the identifiers for all components referencing a ControllerService. - * - * @param reference ControllerServiceReference - * @param visited ControllerServices we've already visited - */ - private void findControllerServiceReferencingComponentIdentifiers(final ControllerServiceReference reference, final Set<ControllerServiceNode> visited) { - for (final ComponentNode component : reference.getReferencingComponents()) { - - // if this is a ControllerService consider it's referencing components - if (component instanceof ControllerServiceNode) { - final ControllerServiceNode node = (ControllerServiceNode) component; - if (!visited.contains(node)) { - visited.add(node); - findControllerServiceReferencingComponentIdentifiers(node.getReferences(), visited); - } - } - } - } - - /** - * Creates entities for components referencing a ControllerService using their current revision. - * - * @param reference ControllerServiceReference - * @return The entity - */ - private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity(final ControllerServiceReference reference, final Set<String> lockedIds) { - final Set<ControllerServiceNode> visited = new HashSet<>(); - visited.add(reference.getReferencedComponent()); - findControllerServiceReferencingComponentIdentifiers(reference, visited); - - final Map<String, Revision> referencingRevisions = new HashMap<>(); - for (final ComponentNode component : reference.getReferencingComponents()) { - referencingRevisions.put(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier())); - } - - return createControllerServiceReferencingComponentsEntity(reference, referencingRevisions); - } - - /** - * Creates entities for components referencing a ControllerService using the specified revisions. - * - * @param reference ControllerServiceReference - * @param revisions The revisions - * @return The entity - */ - private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity( - final ControllerServiceReference reference, final Map<String, Revision> revisions) { - final Set<ControllerServiceNode> visited = new HashSet<>(); - visited.add(reference.getReferencedComponent()); - return createControllerServiceReferencingComponentsEntity(reference, revisions, visited); - } - - /** - * Creates entities for components referencing a ControllerServcie using the specified revisions. - * - * @param reference ControllerServiceReference - * @param revisions The revisions - * @param visited Which services we've already considered (in case of cycle) - * @return The entity - */ - private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity( - final ControllerServiceReference reference, final Map<String, Revision> revisions, final Set<ControllerServiceNode> visited) { - - final String modifier = NiFiUserUtils.getNiFiUserIdentity(); - final Set<ComponentNode> referencingComponents = reference.getReferencingComponents(); - - final Set<ControllerServiceReferencingComponentEntity> componentEntities = new HashSet<>(); - for (final ComponentNode refComponent : referencingComponents) { - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(refComponent); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(refComponent)); - - final Revision revision = revisions.get(refComponent.getIdentifier()); - final FlowModification flowMod = new FlowModification(revision, modifier); - final RevisionDTO revisionDto = dtoFactory.createRevisionDTO(flowMod); - final ControllerServiceReferencingComponentDTO dto = dtoFactory.createControllerServiceReferencingComponentDTO(refComponent); - - if (refComponent instanceof ControllerServiceNode) { - final ControllerServiceNode node = (ControllerServiceNode) refComponent; - - // indicate if we've hit a cycle - dto.setReferenceCycle(visited.contains(node)); - - // mark node as visited before building the reference cycle - visited.add(node); - - // if we haven't encountered this service before include it's referencing components - if (!dto.getReferenceCycle()) { - final ControllerServiceReference refReferences = node.getReferences(); - final Map<String, Revision> referencingRevisions = new HashMap<>(revisions); - for (final ComponentNode component : refReferences.getReferencingComponents()) { - referencingRevisions.putIfAbsent(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier())); - } - final ControllerServiceReferencingComponentsEntity references = createControllerServiceReferencingComponentsEntity(refReferences, referencingRevisions, visited); - dto.setReferencingComponents(references.getControllerServiceReferencingComponents()); - } - } - - componentEntities.add(entityFactory.createControllerServiceReferencingComponentEntity(refComponent.getIdentifier(), dto, revisionDto, permissions, operatePermissions)); - } - - final ControllerServiceReferencingComponentsEntity entity = new ControllerServiceReferencingComponentsEntity(); - entity.setControllerServiceReferencingComponents(componentEntities); - return entity; - } - - @Override - public ControllerServiceEntity deleteControllerService(final Revision revision, final String controllerServiceId) { - final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService)); - final ControllerServiceDTO snapshot = deleteComponent( - revision, - controllerService.getResource(), - () -> controllerServiceDAO.deleteControllerService(controllerServiceId), - true, - dtoFactory.createControllerServiceDto(controllerService)); - - return entityFactory.createControllerServiceEntity(snapshot, null, permissions, operatePermissions, null); - } - - - @Override - public RegistryClientEntity createRegistryClient(Revision revision, RegistryDTO registryDTO) { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - - // request claim for component to be created... revision already verified (version == 0) - final RevisionClaim claim = new StandardRevisionClaim(revision); - - // update revision through revision manager - final RevisionUpdate<FlowRegistry> revisionUpdate = revisionManager.updateRevision(claim, user, () -> { - // add the component - final FlowRegistry registry = registryDAO.createFlowRegistry(registryDTO); - - // save the flow - controllerFacade.save(); - - final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); - return new StandardRevisionUpdate<>(registry, lastMod); - }); - - final FlowRegistry registry = revisionUpdate.getComponent(); - return createRegistryClientEntity(registry); - } - - @Override - public RegistryClientEntity getRegistryClient(final String registryId) { - final FlowRegistry registry = registryDAO.getFlowRegistry(registryId); - return createRegistryClientEntity(registry); - } - - private RegistryClientEntity createRegistryClientEntity(final FlowRegistry flowRegistry) { - if (flowRegistry == null) { - return null; - } - - final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(flowRegistry.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getController()); - final RegistryDTO dto = dtoFactory.createRegistryDto(flowRegistry); - - return entityFactory.createRegistryClientEntity(dto, revision, permissions); - } - - private VersionedFlowEntity createVersionedFlowEntity(final String registryId, final VersionedFlow versionedFlow) { - if (versionedFlow == null) { - return null; - } - - final VersionedFlowDTO dto = new VersionedFlowDTO(); - dto.setRegistryId(registryId); - dto.setBucketId(versionedFlow.getBucketIdentifier()); - dto.setFlowId(versionedFlow.getIdentifier()); - dto.setFlowName(versionedFlow.getName()); - dto.setDescription(versionedFlow.getDescription()); - - final VersionedFlowEntity entity = new VersionedFlowEntity(); - entity.setVersionedFlow(dto); - - return entity; - } - - private VersionedFlowSnapshotMetadataEntity createVersionedFlowSnapshotMetadataEntity(final String registryId, final VersionedFlowSnapshotMetadata metadata) { - if (metadata == null) { - return null; - } - - final VersionedFlowSnapshotMetadataEntity entity = new VersionedFlowSnapshotMetadataEntity(); - entity.setRegistryId(registryId); - entity.setVersionedFlowMetadata(metadata); - - return entity; - } - - @Override - public Set<RegistryClientEntity> getRegistryClients() { - return registryDAO.getFlowRegistries().stream() - .map(this::createRegistryClientEntity) - .collect(Collectors.toSet()); - } - - @Override - public Set<RegistryEntity> getRegistriesForUser(final NiFiUser user) { - return registryDAO.getFlowRegistriesForUser(user).stream() - .map(flowRegistry -> entityFactory.createRegistryEntity(dtoFactory.createRegistryDto(flowRegistry))) - .collect(Collectors.toSet()); - } - - @Override - public Set<BucketEntity> getBucketsForUser(final String registryId, final NiFiUser user) { - return registryDAO.getBucketsForUser(registryId, user).stream() - .map(bucket -> { - if (bucket == null) { - return null; - } - - final BucketDTO dto = new BucketDTO(); - dto.setId(bucket.getIdentifier()); - dto.setName(bucket.getName()); - dto.setDescription(bucket.getDescription()); - dto.setCreated(bucket.getCreatedTimestamp()); - - final Permissions regPermissions = bucket.getPermissions(); - final PermissionsDTO permissions = new PermissionsDTO(); - permissions.setCanRead(regPermissions.getCanRead()); - permissions.setCanWrite(regPermissions.getCanWrite()); - - return entityFactory.createBucketEntity(dto, permissions); - }) - .collect(Collectors.toSet()); - } - - @Override - public Set<VersionedFlowEntity> getFlowsForUser(String registryId, String bucketId, NiFiUser user) { - return registryDAO.getFlowsForUser(registryId, bucketId, user).stream() - .map(vf -> createVersionedFlowEntity(registryId, vf)) - .collect(Collectors.toSet()); - } - - @Override - public Set<VersionedFlowSnapshotMetadataEntity> getFlowVersionsForUser(String registryId, String bucketId, String flowId, NiFiUser user) { - return registryDAO.getFlowVersionsForUser(registryId, bucketId, flowId, user).stream() - .map(md -> createVersionedFlowSnapshotMetadataEntity(registryId, md)) - .collect(Collectors.toSet()); - } - - @Override - public RegistryClientEntity updateRegistryClient(Revision revision, RegistryDTO registryDTO) { - final RevisionClaim revisionClaim = new StandardRevisionClaim(revision); - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - - final FlowRegistry registry = registryDAO.getFlowRegistry(registryDTO.getId()); - final RevisionUpdate<FlowRegistry> revisionUpdate = revisionManager.updateRevision(revisionClaim, user, () -> { - final boolean duplicateName = registryDAO.getFlowRegistries().stream() - .anyMatch(reg -> reg.getName().equals(registryDTO.getName()) && !reg.getIdentifier().equals(registryDTO.getId())); - - if (duplicateName) { - throw new IllegalStateException("Cannot update Flow Registry because a Flow Registry already exists with the name " + registryDTO.getName()); - } - - registry.setDescription(registryDTO.getDescription()); - registry.setName(registryDTO.getName()); - registry.setURL(registryDTO.getUri()); - - controllerFacade.save(); - - final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId()); - final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity()); - - return new StandardRevisionUpdate<>(registry, lastModification); - }); - - final FlowRegistry updatedReg = revisionUpdate.getComponent(); - return createRegistryClientEntity(updatedReg); - } - - @Override - public void verifyDeleteRegistry(String registryId) { - processGroupDAO.verifyDeleteFlowRegistry(registryId); - } - - @Override - public RegistryClientEntity deleteRegistryClient(final Revision revision, final String registryId) { - final RevisionClaim claim = new StandardRevisionClaim(revision); - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - - final FlowRegistry registry = revisionManager.deleteRevision(claim, user, () -> { - final FlowRegistry reg = registryDAO.removeFlowRegistry(registryId); - controllerFacade.save(); - return reg; - }); - - return createRegistryClientEntity(registry); - } - - @Override - public ReportingTaskEntity createReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - - // request claim for component to be created... revision already verified (version == 0) - final RevisionClaim claim = new StandardRevisionClaim(revision); - - // update revision through revision manager - final RevisionUpdate<ReportingTaskDTO> snapshot = revisionManager.updateRevision(claim, user, () -> { - // create the reporting task - final ReportingTaskNode reportingTask = reportingTaskDAO.createReportingTask(reportingTaskDTO); - - // save the update - controllerFacade.save(); - awaitValidationCompletion(reportingTask); - - final ReportingTaskDTO dto = dtoFactory.createReportingTaskDto(reportingTask); - final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); - return new StandardRevisionUpdate<>(dto, lastMod); - }); - - final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskDTO.getId()); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask)); - final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier())); - final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createReportingTaskEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities); - } - - @Override - public ReportingTaskEntity updateReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) { - // get the component, ensure we have access to it, and perform the update request - final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskDTO.getId()); - final RevisionUpdate<ReportingTaskDTO> snapshot = updateComponent(revision, - reportingTask, - () -> reportingTaskDAO.updateReportingTask(reportingTaskDTO), - rt -> { - awaitValidationCompletion(rt); - return dtoFactory.createReportingTaskDto(rt); - }); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask)); - final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier())); - final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createReportingTaskEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities); - } - - @Override - public ReportingTaskEntity deleteReportingTask(final Revision revision, final String reportingTaskId) { - final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask)); - final ReportingTaskDTO snapshot = deleteComponent( - revision, - reportingTask.getResource(), - () -> reportingTaskDAO.deleteReportingTask(reportingTaskId), - true, - dtoFactory.createReportingTaskDto(reportingTask)); - - return entityFactory.createReportingTaskEntity(snapshot, null, permissions, operatePermissions, null); - } - - @Override - public void deleteActions(final Date endDate) { - // get the user from the request - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - if (user == null) { - throw new WebApplicationException(new Throwable("Unable to access details for current user.")); - } - - // create the purge details - final FlowChangePurgeDetails details = new FlowChangePurgeDetails(); - details.setEndDate(endDate); - - // create a purge action to record that records are being removed - final FlowChangeAction purgeAction = new FlowChangeAction(); - purgeAction.setUserIdentity(user.getIdentity()); - purgeAction.setOperation(Operation.Purge); - purgeAction.setTimestamp(new Date()); - purgeAction.setSourceId("Flow Controller"); - purgeAction.setSourceName("History"); - purgeAction.setSourceType(Component.Controller); - purgeAction.setActionDetails(details); - - // purge corresponding actions - auditService.purgeActions(endDate, purgeAction); - } - - @Override - public ProvenanceDTO submitProvenance(final ProvenanceDTO query) { - return controllerFacade.submitProvenance(query); - } - - @Override - public void deleteProvenance(final String queryId) { - controllerFacade.deleteProvenanceQuery(queryId); - } - - @Override - public LineageDTO submitLineage(final LineageDTO lineage) { - return controllerFacade.submitLineage(lineage); - } - - @Override - public void deleteLineage(final String lineageId) { - controllerFacade.deleteLineage(lineageId); - } - - @Override - public ProvenanceEventDTO submitReplay(final Long eventId) { - return controllerFacade.submitReplay(eventId); - } - - // ----------------------------------------- - // Read Operations - // ----------------------------------------- - - @Override - public SearchResultsDTO searchController(final String query) { - return controllerFacade.search(query); - } - - @Override - public DownloadableContent getContent(final String connectionId, final String flowFileUuid, final String uri) { - return connectionDAO.getContent(connectionId, flowFileUuid, uri); - } - - @Override - public DownloadableContent getContent(final Long eventId, final String uri, final ContentDirection contentDirection) { - return controllerFacade.getContent(eventId, uri, contentDirection); - } - - @Override - public ProvenanceDTO getProvenance(final String queryId, final Boolean summarize, final Boolean incrementalResults) { - return controllerFacade.getProvenanceQuery(queryId, summarize, incrementalResults); - } - - @Override - public LineageDTO getLineage(final String lineageId) { - return controllerFacade.getLineage(lineageId); - } - - @Override - public ProvenanceOptionsDTO getProvenanceSearchOptions() { - return controllerFacade.getProvenanceSearchOptions(); - } - - @Override - public ProvenanceEventDTO getProvenanceEvent(final Long id) { - return controllerFacade.getProvenanceEvent(id); - } - - @Override - public ProcessGroupStatusEntity getProcessGroupStatus(final String groupId, final boolean recursive) { - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); - final ProcessGroupStatusDTO dto = dtoFactory.createProcessGroupStatusDto(processGroup, controllerFacade.getProcessGroupStatus(groupId)); - - // prune the response as necessary - if (!recursive) { - pruneChildGroups(dto.getAggregateSnapshot()); - if (dto.getNodeSnapshots() != null) { - for (final NodeProcessGroupStatusSnapshotDTO nodeSnapshot : dto.getNodeSnapshots()) { - pruneChildGroups(nodeSnapshot.getStatusSnapshot()); - } - } - } - - return entityFactory.createProcessGroupStatusEntity(dto, permissions); - } - - private void pruneChildGroups(final ProcessGroupStatusSnapshotDTO snapshot) { - for (final ProcessGroupStatusSnapshotEntity childProcessGroupStatusEntity : snapshot.getProcessGroupStatusSnapshots()) { - final ProcessGroupStatusSnapshotDTO childProcessGroupStatus = childProcessGroupStatusEntity.getProcessGroupStatusSnapshot(); - childProcessGroupStatus.setConnectionStatusSnapshots(null); - childProcessGroupStatus.setProcessGroupStatusSnapshots(null); - childProcessGroupStatus.setInputPortStatusSnapshots(null); - childProcessGroupStatus.setOutputPortStatusSnapshots(null); - childProcessGroupStatus.setProcessorStatusSnapshots(null); - childProcessGroupStatus.setRemoteProcessGroupStatusSnapshots(null); - } - } - - @Override - public ControllerStatusDTO getControllerStatus() { - return controllerFacade.getControllerStatus(); - } - - @Override - public ComponentStateDTO getProcessorState(final String processorId) { - final StateMap clusterState = isClustered() ? processorDAO.getState(processorId, Scope.CLUSTER) : null; - final StateMap localState = processorDAO.getState(processorId, Scope.LOCAL); - - // processor will be non null as it was already found when getting the state - final ProcessorNode processor = processorDAO.getProcessor(processorId); - return dtoFactory.createComponentStateDTO(processorId, processor.getProcessor().getClass(), localState, clusterState); - } - - @Override - public ComponentStateDTO getControllerServiceState(final String controllerServiceId) { - final StateMap clusterState = isClustered() ? controllerServiceDAO.getState(controllerServiceId, Scope.CLUSTER) : null; - final StateMap localState = controllerServiceDAO.getState(controllerServiceId, Scope.LOCAL); - - // controller service will be non null as it was already found when getting the state - final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId); - return dtoFactory.createComponentStateDTO(controllerServiceId, controllerService.getControllerServiceImplementation().getClass(), localState, clusterState); - } - - @Override - public ComponentStateDTO getReportingTaskState(final String reportingTaskId) { - final StateMap clusterState = isClustered() ? reportingTaskDAO.getState(reportingTaskId, Scope.CLUSTER) : null; - final StateMap localState = reportingTaskDAO.getState(reportingTaskId, Scope.LOCAL); - - // reporting task will be non null as it was already found when getting the state - final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId); - return dtoFactory.createComponentStateDTO(reportingTaskId, reportingTask.getReportingTask().getClass(), localState, clusterState); - } - - @Override - public CountersDTO getCounters() { - final List<Counter> counters = controllerFacade.getCounters(); - final Set<CounterDTO> counterDTOs = new LinkedHashSet<>(counters.size()); - for (final Counter counter : counters) { - counterDTOs.add(dtoFactory.createCounterDto(counter)); - } - - final CountersSnapshotDTO snapshotDto = dtoFactory.createCountersDto(counterDTOs); - final CountersDTO countersDto = new CountersDTO(); - countersDto.setAggregateSnapshot(snapshotDto); - - return countersDto; - } - - private ConnectionEntity createConnectionEntity(final Connection connection) { - final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(connection.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection); - final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connection.getIdentifier())); - return entityFactory.createConnectionEntity(dtoFactory.createConnectionDto(connection), revision, permissions, status); - } - - @Override - public Set<ConnectionEntity> getConnections(final String groupId) { - final Set<Connection> connections = connectionDAO.getConnections(groupId); - return connections.stream() - .map(connection -> createConnectionEntity(connection)) - .collect(Collectors.toSet()); - } - - @Override - public ConnectionEntity getConnection(final String connectionId) { - final Connection connection = connectionDAO.getConnection(connectionId); - return createConnectionEntity(connection); - } - - @Override - public DropRequestDTO getFlowFileDropRequest(final String connectionId, final String dropRequestId) { - return dtoFactory.createDropRequestDTO(connectionDAO.getFlowFileDropRequest(connectionId, dropRequestId)); - } - - @Override - public ListingRequestDTO getFlowFileListingRequest(final String connectionId, final String listingRequestId) { - final Connection connection = connectionDAO.getConnection(connectionId); - final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.getFlowFileListingRequest(connectionId, listingRequestId)); - - // include whether the source and destination are running - if (connection.getSource() != null) { - listRequest.setSourceRunning(connection.getSource().isRunning()); - } - if (connection.getDestination() != null) { - listRequest.setDestinationRunning(connection.getDestination().isRunning()); - } - - return listRequest; - } - - @Override - public FlowFileDTO getFlowFile(final String connectionId, final String flowFileUuid) { - return dtoFactory.createFlowFileDTO(connectionDAO.getFlowFile(connectionId, flowFileUuid)); - } - - @Override - public ConnectionStatusEntity getConnectionStatus(final String connectionId) { - final Connection connection = connectionDAO.getConnection(connectionId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection); - final ConnectionStatusDTO dto = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionId)); - return entityFactory.createConnectionStatusEntity(dto, permissions); - } - - @Override - public StatusHistoryEntity getConnectionStatusHistory(final String connectionId) { - final Connection connection = connectionDAO.getConnection(connectionId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection); - final StatusHistoryDTO dto = controllerFacade.getConnectionStatusHistory(connectionId); - return entityFactory.createStatusHistoryEntity(dto, permissions); - } - - private ProcessorEntity createProcessorEntity(final ProcessorNode processor, final NiFiUser user) { - final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processor.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor, user); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor)); - final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier())); - final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processor.getIdentifier())); - final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createProcessorEntity(dtoFactory.createProcessorDto(processor), revision, permissions, operatePermissions, status, bulletinEntities); - } - - @Override - public Set<ProcessorEntity> getProcessors(final String groupId, final boolean includeDescendants) { - final Set<ProcessorNode> processors = processorDAO.getProcessors(groupId, includeDescendants); - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - return processors.stream() - .map(processor -> createProcessorEntity(processor, user)) - .collect(Collectors.toSet()); - } - - @Override - public TemplateDTO exportTemplate(final String id) { - final Template template = templateDAO.getTemplate(id); - final TemplateDTO templateDetails = template.getDetails(); - - final TemplateDTO templateDTO = dtoFactory.createTemplateDTO(template); - templateDTO.setSnippet(dtoFactory.copySnippetContents(templateDetails.getSnippet())); - return templateDTO; - } - - @Override - public TemplateDTO getTemplate(final String id) { - return dtoFactory.createTemplateDTO(templateDAO.getTemplate(id)); - } - - @Override - public Set<TemplateEntity> getTemplates() { - return templateDAO.getTemplates().stream() - .map(template -> { - final TemplateDTO dto = dtoFactory.createTemplateDTO(template); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(template); - - final TemplateEntity entity = new TemplateEntity(); - entity.setId(dto.getId()); - entity.setPermissions(permissions); - entity.setTemplate(dto); - return entity; - }).collect(Collectors.toSet()); - } - - @Override - public Set<DocumentedTypeDTO> getWorkQueuePrioritizerTypes() { - return controllerFacade.getFlowFileComparatorTypes(); - } - - @Override - public Set<DocumentedTypeDTO> getProcessorTypes(final String bundleGroup, final String bundleArtifact, final String type) { - return controllerFacade.getFlowFileProcessorTypes(bundleGroup, bundleArtifact, type); - } - - @Override - public Set<DocumentedTypeDTO> getControllerServiceTypes(final String serviceType, final String serviceBundleGroup, final String serviceBundleArtifact, final String serviceBundleVersion, - final String bundleGroup, final String bundleArtifact, final String type) { - return controllerFacade.getControllerServiceTypes(serviceType, serviceBundleGroup, serviceBundleArtifact, serviceBundleVersion, bundleGroup, bundleArtifact, type); - } - - @Override - public Set<DocumentedTypeDTO> getReportingTaskTypes(final String bundleGroup, final String bundleArtifact, final String type) { - return controllerFacade.getReportingTaskTypes(bundleGroup, bundleArtifact, type); - } - - @Override - public ProcessorEntity getProcessor(final String id) { - final ProcessorNode processor = processorDAO.getProcessor(id); - return createProcessorEntity(processor, NiFiUserUtils.getNiFiUser()); - } - - @Override - public PropertyDescriptorDTO getProcessorPropertyDescriptor(final String id, final String property) { - final ProcessorNode processor = processorDAO.getProcessor(id); - PropertyDescriptor descriptor = processor.getPropertyDescriptor(property); - - // return an invalid descriptor if the processor doesn't support this property - if (descriptor == null) { - descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build(); - } - - return dtoFactory.createPropertyDescriptorDto(descriptor, processor.getProcessGroup().getIdentifier()); - } - - @Override - public ProcessorStatusEntity getProcessorStatus(final String id) { - final ProcessorNode processor = processorDAO.getProcessor(id); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor); - final ProcessorStatusDTO dto = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(id)); - return entityFactory.createProcessorStatusEntity(dto, permissions); - } - - @Override - public StatusHistoryEntity getProcessorStatusHistory(final String id) { - final ProcessorNode processor = processorDAO.getProcessor(id); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor); - final StatusHistoryDTO dto = controllerFacade.getProcessorStatusHistory(id); - return entityFactory.createStatusHistoryEntity(dto, permissions); - } - - private boolean authorizeBulletin(final Bulletin bulletin) { - final String sourceId = bulletin.getSourceId(); - final ComponentType type = bulletin.getSourceType(); - - final Authorizable authorizable; - try { - switch (type) { - case PROCESSOR: - authorizable = authorizableLookup.getProcessor(sourceId).getAuthorizable(); - break; - case REPORTING_TASK: - authorizable = authorizableLookup.getReportingTask(sourceId).getAuthorizable(); - break; - case CONTROLLER_SERVICE: - authorizable = authorizableLookup.getControllerService(sourceId).getAuthorizable(); - break; - case FLOW_CONTROLLER: - authorizable = controllerFacade; - break; - case INPUT_PORT: - authorizable = authorizableLookup.getInputPort(sourceId); - break; - case OUTPUT_PORT: - authorizable = authorizableLookup.getOutputPort(sourceId); - break; - case REMOTE_PROCESS_GROUP: - authorizable = authorizableLookup.getRemoteProcessGroup(sourceId); - break; - default: - throw new WebApplicationException(Response.serverError().entity("An unexpected type of component is the source of this bulletin.").build()); - } - } catch (final ResourceNotFoundException e) { - // if the underlying component is gone, disallow - return false; - } - - // perform the authorization - final AuthorizationResult result = authorizable.checkAuthorization(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); - return Result.Approved.equals(result.getResult()); - } - - @Override - public BulletinBoardDTO getBulletinBoard(final BulletinQueryDTO query) { - // build the query - final BulletinQuery.Builder queryBuilder = new BulletinQuery.Builder() - .groupIdMatches(query.getGroupId()) - .sourceIdMatches(query.getSourceId()) - .nameMatches(query.getName()) - .messageMatches(query.getMessage()) - .after(query.getAfter()) - .limit(query.getLimit()); - - // perform the query - final List<Bulletin> results = bulletinRepository.findBulletins(queryBuilder.build()); - - // perform the query and generate the results - iterating in reverse order since we are - // getting the most recent results by ordering by timestamp desc above. this gets the - // exact results we want but in reverse order - final List<BulletinEntity> bulletinEntities = new ArrayList<>(); - for (final ListIterator<Bulletin> bulletinIter = results.listIterator(results.size()); bulletinIter.hasPrevious(); ) { - final Bulletin bulletin = bulletinIter.previous(); - bulletinEntities.add(entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), authorizeBulletin(bulletin))); - } - - // create the bulletin board - final BulletinBoardDTO bulletinBoard = new BulletinBoardDTO(); - bulletinBoard.setBulletins(bulletinEntities); - bulletinBoard.setGenerated(new Date()); - return bulletinBoard; - } - - @Override - public SystemDiagnosticsDTO getSystemDiagnostics() { - final SystemDiagnostics sysDiagnostics = controllerFacade.getSystemDiagnostics(); - return dtoFactory.createSystemDiagnosticsDto(sysDiagnostics); - } - - @Override - public List<ResourceDTO> getResources() { - final List<Resource> resources = controllerFacade.getResources(); - final List<ResourceDTO> resourceDtos = new ArrayList<>(resources.size()); - for (final Resource resource : resources) { - resourceDtos.add(dtoFactory.createResourceDto(resource)); - } - return resourceDtos; - } - - @Override - public void discoverCompatibleBundles(VersionedProcessGroup versionedGroup) { - BundleUtils.discoverCompatibleBundles(controllerFacade.getExtensionManager(), versionedGroup); - } - - @Override - public BundleCoordinate getCompatibleBundle(String type, BundleDTO bundleDTO) { - return BundleUtils.getCompatibleBundle(controllerFacade.getExtensionManager(), type, bundleDTO); - } - - @Override - public ConfigurableComponent getTempComponent(String classType, BundleCoordinate bundleCoordinate) { - return controllerFacade.getExtensionManager().getTempComponent(classType, bundleCoordinate); - } - - /** - * Ensures the specified user has permission to access the specified port. This method does - * not utilize the DataTransferAuthorizable as that will enforce the entire chain is - * authorized for the transfer. This method is only invoked when obtaining the site to site - * details so the entire chain isn't necessary. - */ - private boolean isUserAuthorized(final NiFiUser user, final RootGroupPort port) { - final boolean isSiteToSiteSecure = Boolean.TRUE.equals(properties.isSiteToSiteSecure()); - - // if site to site is not secure, allow all users - if (!isSiteToSiteSecure) { - return true; - } - - final Map<String, String> userContext; - if (user.getClientAddress() != null && !user.getClientAddress().trim().isEmpty()) { - userContext = new HashMap<>(); - userContext.put(UserContextKeys.CLIENT_ADDRESS.name(), user.getClientAddress()); - } else { - userContext = null; - } - - final AuthorizationRequest request = new AuthorizationRequest.Builder() - .resource(ResourceFactory.getDataTransferResource(port.getResource())) - .identity(user.getIdentity()) - .groups(user.getGroups()) - .anonymous(user.isAnonymous()) - .accessAttempt(false) - .action(RequestAction.WRITE) - .userContext(userContext) - .explanationSupplier(() -> "Unable to retrieve port details.") - .build(); - - final AuthorizationResult result = authorizer.authorize(request); - return Result.Approved.equals(result.getResult()); - } - - @Override - public ControllerDTO getSiteToSiteDetails() { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - if (user == null) { - throw new WebApplicationException(new Throwable("Unable to access details for current user.")); - } - - // serialize the input ports this NiFi has access to - final Set<PortDTO> inputPortDtos = new LinkedHashSet<>(); - final Set<RootGroupPort> inputPorts = controllerFacade.getInputPorts(); - for (final RootGroupPort inputPort : inputPorts) { - if (isUserAuthorized(user, inputPort)) { - final PortDTO dto = new PortDTO(); - dto.setId(inputPort.getIdentifier()); - dto.setName(inputPort.getName()); - dto.setComments(inputPort.getComments()); - dto.setState(inputPort.getScheduledState().toString()); - inputPortDtos.add(dto); - } - } - - // serialize the output ports this NiFi has access to - final Set<PortDTO> outputPortDtos = new LinkedHashSet<>(); - for (final RootGroupPort outputPort : controllerFacade.getOutputPorts()) { - if (isUserAuthorized(user, outputPort)) { - final PortDTO dto = new PortDTO(); - dto.setId(outputPort.getIdentifier()); - dto.setName(outputPort.getName()); - dto.setComments(outputPort.getComments()); - dto.setState(outputPort.getScheduledState().toString()); - outputPortDtos.add(dto); - } - } - - // get the root group - final ProcessGroup rootGroup = processGroupDAO.getProcessGroup(controllerFacade.getRootGroupId()); - final ProcessGroupCounts counts = rootGroup.getCounts(); - - // create the controller dto - final ControllerDTO controllerDTO = new ControllerDTO(); - controllerDTO.setId(controllerFacade.getRootGroupId()); - controllerDTO.setInstanceId(controllerFacade.getInstanceId()); - controllerDTO.setName(controllerFacade.getName()); - controllerDTO.setComments(controllerFacade.getComments()); - controllerDTO.setInputPorts(inputPortDtos); - controllerDTO.setOutputPorts(outputPortDtos); - controllerDTO.setInputPortCount(inputPortDtos.size()); - controllerDTO.setOutputPortCount(outputPortDtos.size()); - controllerDTO.setRunningCount(counts.getRunningCount()); - controllerDTO.setStoppedCount(counts.getStoppedCount()); - controllerDTO.setInvalidCount(counts.getInvalidCount()); - controllerDTO.setDisabledCount(counts.getDisabledCount()); - - // determine the site to site configuration - controllerDTO.setRemoteSiteListeningPort(controllerFacade.getRemoteSiteListeningPort()); - controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getRemoteSiteListeningHttpPort()); - controllerDTO.setSiteToSiteSecure(controllerFacade.isRemoteSiteCommsSecure()); - - return controllerDTO; - } - - @Override - public ControllerConfigurationEntity getControllerConfiguration() { - final Revision rev = revisionManager.getRevision(FlowController.class.getSimpleName()); - final ControllerConfigurationDTO dto = dtoFactory.createControllerConfigurationDto(controllerFacade); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerFacade); - final RevisionDTO revision = dtoFactory.createRevisionDTO(rev); - return entityFactory.createControllerConfigurationEntity(dto, revision, permissions); - } - - @Override - public ControllerBulletinsEntity getControllerBulletins() { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final ControllerBulletinsEntity controllerBulletinsEntity = new ControllerBulletinsEntity(); - - final List<BulletinEntity> controllerBulletinEntities = new ArrayList<>(); - - final Authorizable controllerAuthorizable = authorizableLookup.getController(); - final boolean authorized = controllerAuthorizable.isAuthorized(authorizer, RequestAction.READ, user); - final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController()); - controllerBulletinEntities.addAll(bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, authorized)).collect(Collectors.toList())); - - // get the controller service bulletins - final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build(); - final List<Bulletin> allControllerServiceBulletins = bulletinRepository.findBulletins(controllerServiceQuery); - final List<BulletinEntity> controllerServiceBulletinEntities = new ArrayList<>(); - for (final Bulletin bulletin : allControllerServiceBulletins) { - try { - final Authorizable controllerServiceAuthorizable = authorizableLookup.getControllerService(bulletin.getSourceId()).getAuthorizable(); - final boolean controllerServiceAuthorized = controllerServiceAuthorizable.isAuthorized(authorizer, RequestAction.READ, user); - - final BulletinEntity controllerServiceBulletin = entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), controllerServiceAuthorized); - controllerServiceBulletinEntities.add(controllerServiceBulletin); - controllerBulletinEntities.add(controllerServiceBulletin); - } catch (final ResourceNotFoundException e) { - // controller service missing.. skip - } - } - controllerBulletinsEntity.setControllerServiceBulletins(controllerServiceBulletinEntities); - - // get the reporting task bulletins - final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build(); - final List<Bulletin> allReportingTaskBulletins = bulletinRepository.findBulletins(reportingTaskQuery); - final List<BulletinEntity> reportingTaskBulletinEntities = new ArrayList<>(); - for (final Bulletin bulletin : allReportingTaskBulletins) { - try { - final Authorizable reportingTaskAuthorizable = authorizableLookup.getReportingTask(bulletin.getSourceId()).getAuthorizable(); - final boolean reportingTaskAuthorizableAuthorized = reportingTaskAuthorizable.isAuthorized(authorizer, RequestAction.READ, user); - - final BulletinEntity reportingTaskBulletin = entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), reportingTaskAuthorizableAuthorized); - reportingTaskBulletinEntities.add(reportingTaskBulletin); - controllerBulletinEntities.add(reportingTaskBulletin); - } catch (final ResourceNotFoundException e) { - // reporting task missing.. skip - } - } - controllerBulletinsEntity.setReportingTaskBulletins(reportingTaskBulletinEntities); - - controllerBulletinsEntity.setBulletins(pruneAndSortBulletins(controllerBulletinEntities, BulletinRepository.MAX_BULLETINS_FOR_CONTROLLER)); - return controllerBulletinsEntity; - } - - @Override - public FlowConfigurationEntity getFlowConfiguration() { - final FlowConfigurationDTO dto = dtoFactory.createFlowConfigurationDto(properties.getAutoRefreshInterval(), - properties.getDefaultBackPressureObjectThreshold(), properties.getDefaultBackPressureDataSizeThreshold(),properties.getDcaeDistributorApiHostname()); - final FlowConfigurationEntity entity = new FlowConfigurationEntity(); - entity.setFlowConfiguration(dto); - return entity; - } - - @Override - public AccessPolicyEntity getAccessPolicy(final String accessPolicyId) { - final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(accessPolicyId); - return createAccessPolicyEntity(accessPolicy); - } - - @Override - public AccessPolicyEntity getAccessPolicy(final RequestAction requestAction, final String resource) { - Authorizable authorizable; - try { - authorizable = authorizableLookup.getAuthorizableFromResource(resource); - } catch (final ResourceNotFoundException e) { - // unable to find the underlying authorizable... user authorized based on top level /policies... create - // an anonymous authorizable to attempt to locate an existing policy for this resource - authorizable = new Authorizable() { - @Override - public Authorizable getParentAuthorizable() { - return null; - } - - @Override - public Resource getResource() { - return new Resource() { - @Override - public String getIdentifier() { - return resource; - } - - @Override - public String getName() { - return resource; - } - - @Override - public String getSafeDescription() { - return "Policy " + resource; - } - }; - } - }; - } - - final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(requestAction, authorizable); - return createAccessPolicyEntity(accessPolicy); - } - - private AccessPolicyEntity createAccessPolicyEntity(final AccessPolicy accessPolicy) { - final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(accessPolicy.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicy.getIdentifier())); - final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource()); - return entityFactory.createAccessPolicyEntity( - dtoFactory.createAccessPolicyDto(accessPolicy, - accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()), - accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()), componentReference), - revision, permissions); - } - - @Override - public UserEntity getUser(final String userId) { - final User user = userDAO.getUser(userId); - return createUserEntity(user, true); - } - - @Override - public Set<UserEntity> getUsers() { - final Set<User> users = userDAO.getUsers(); - return users.stream() - .map(user -> createUserEntity(user, false)) - .collect(Collectors.toSet()); - } - - private UserEntity createUserEntity(final User user, final boolean enforceUserExistence) { - final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(user.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); - final Set<TenantEntity> userGroups = userGroupDAO.getUserGroupsForUser(user.getIdentifier()).stream() - .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(enforceUserExistence)).collect(Collectors.toSet()); - final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUser(user.getIdentifier()).stream() - .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); - return entityFactory.createUserEntity(dtoFactory.createUserDto(user, userGroups, policyEntities), userRevision, permissions); - } - - private UserGroupEntity createUserGroupEntity(final Group userGroup, final boolean enforceGroupExistence) { - final RevisionDTO userGroupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userGroup.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); - final Set<TenantEntity> users = userGroup.getUsers().stream().map(mapUserIdToTenantEntity(enforceGroupExistence)).collect(Collectors.toSet()); - final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(userGroup.getIdentifier()).stream() - .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); - return entityFactory.createUserGroupEntity(dtoFactory.createUserGroupDto(userGroup, users, policyEntities), userGroupRevision, permissions); - } - - @Override - public UserGroupEntity getUserGroup(final String userGroupId) { - final Group userGroup = userGroupDAO.getUserGroup(userGroupId); - return createUserGroupEntity(userGroup, true); - } - - @Override - public Set<UserGroupEntity> getUserGroups() { - final Set<Group> userGroups = userGroupDAO.getUserGroups(); - return userGroups.stream() - .map(userGroup -> createUserGroupEntity(userGroup, false)) - .collect(Collectors.toSet()); - } - - private LabelEntity createLabelEntity(final Label label) { - final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(label.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label); - return entityFactory.createLabelEntity(dtoFactory.createLabelDto(label), revision, permissions); - } - - @Override - public Set<LabelEntity> getLabels(final String groupId) { - final Set<Label> labels = labelDAO.getLabels(groupId); - return labels.stream() - .map(label -> createLabelEntity(label)) - .collect(Collectors.toSet()); - } - - @Override - public LabelEntity getLabel(final String labelId) { - final Label label = labelDAO.getLabel(labelId); - return createLabelEntity(label); - } - - private FunnelEntity createFunnelEntity(final Funnel funnel) { - final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(funnel.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnel); - return entityFactory.createFunnelEntity(dtoFactory.createFunnelDto(funnel), revision, permissions); - } - - @Override - public Set<FunnelEntity> getFunnels(final String groupId) { - final Set<Funnel> funnels = funnelDAO.getFunnels(groupId); - return funnels.stream() - .map(funnel -> createFunnelEntity(funnel)) - .collect(Collectors.toSet()); - } - - @Override - public FunnelEntity getFunnel(final String funnelId) { - final Funnel funnel = funnelDAO.getFunnel(funnelId); - return createFunnelEntity(funnel); - } - - private PortEntity createInputPortEntity(final Port port) { - final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(port.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port, NiFiUserUtils.getNiFiUser()); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port), NiFiUserUtils.getNiFiUser()); - final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(port.getIdentifier())); - final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier())); - final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createPortEntity(dtoFactory.createPortDto(port), revision, permissions, operatePermissions, status, bulletinEntities); - } - - private PortEntity createOutputPortEntity(final Port port) { - final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(port.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port, NiFiUserUtils.getNiFiUser()); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port), NiFiUserUtils.getNiFiUser()); - final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(port.getIdentifier())); - final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier())); - final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createPortEntity(dtoFactory.createPortDto(port), revision, permissions, operatePermissions, status, bulletinEntities); - } - - @Override - public Set<PortEntity> getInputPorts(final String groupId) { - final Set<Port> inputPorts = inputPortDAO.getPorts(groupId); - return inputPorts.stream() - .map(port -> createInputPortEntity(port)) - .collect(Collectors.toSet()); - } - - @Override - public Set<PortEntity> getOutputPorts(final String groupId) { - final Set<Port> ports = outputPortDAO.getPorts(groupId); - return ports.stream() - .map(port -> createOutputPortEntity(port)) - .collect(Collectors.toSet()); - } - - private ProcessGroupEntity createProcessGroupEntity(final ProcessGroup group) { - final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(group.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(group); - final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(group.getIdentifier())); - final List<BulletinEntity> bulletins = getProcessGroupBulletins(group); - return entityFactory.createProcessGroupEntity(dtoFactory.createProcessGroupDto(group), revision, permissions, status, bulletins); - } - - private List<BulletinEntity> getProcessGroupBulletins(final ProcessGroup group) { - final List<Bulletin> bulletins = new ArrayList<>(bulletinRepository.findBulletinsForGroupBySource(group.getIdentifier())); - - for (final ProcessGroup descendantGroup : group.findAllProcessGroups()) { - bulletins.addAll(bulletinRepository.findBulletinsForGroupBySource(descendantGroup.getIdentifier())); - } - - List<BulletinEntity> bulletinEntities = new ArrayList<>(); - for (final Bulletin bulletin : bulletins) { - bulletinEntities.add(entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), authorizeBulletin(bulletin))); - } - - return pruneAndSortBulletins(bulletinEntities, BulletinRepository.MAX_BULLETINS_PER_COMPONENT); - } - - private List<BulletinEntity> pruneAndSortBulletins(final List<BulletinEntity> bulletinEntities, final int maxBulletins) { - // sort the bulletins - Collections.sort(bulletinEntities, new Comparator<BulletinEntity>() { - @Override - public int compare(BulletinEntity o1, BulletinEntity o2) { - if (o1 == null && o2 == null) { - return 0; - } - if (o1 == null) { - return 1; - } - if (o2 == null) { - return -1; - } - - return -Long.compare(o1.getId(), o2.getId()); - } - }); - - // prune the response to only include the max number of bulletins - if (bulletinEntities.size() > maxBulletins) { - return bulletinEntities.subList(0, maxBulletins); - } else { - return bulletinEntities; - } - } - - @Override - public Set<ProcessGroupEntity> getProcessGroups(final String parentGroupId) { - final Set<ProcessGroup> groups = processGroupDAO.getProcessGroups(parentGroupId); - return groups.stream() - .map(group -> createProcessGroupEntity(group)) - .collect(Collectors.toSet()); - } - - private RemoteProcessGroupEntity createRemoteGroupEntity(final RemoteProcessGroup rpg, final NiFiUser user) { - final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(rpg.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(rpg, user); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(rpg), user); - final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(rpg, controllerFacade.getRemoteProcessGroupStatus(rpg.getIdentifier())); - final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(rpg.getIdentifier())); - final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createRemoteProcessGroupEntity(dtoFactory.createRemoteProcessGroupDto(rpg), revision, permissions, operatePermissions, status, bulletinEntities); - } - - @Override - public Set<RemoteProcessGroupEntity> getRemoteProcessGroups(final String groupId) { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final Set<RemoteProcessGroup> rpgs = remoteProcessGroupDAO.getRemoteProcessGroups(groupId); - return rpgs.stream() - .map(rpg -> createRemoteGroupEntity(rpg, user)) - .collect(Collectors.toSet()); - } - - @Override - public PortEntity getInputPort(final String inputPortId) { - final Port port = inputPortDAO.getPort(inputPortId); - return createInputPortEntity(port); - } - - @Override - public PortStatusEntity getInputPortStatus(final String inputPortId) { - final Port inputPort = inputPortDAO.getPort(inputPortId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(inputPort); - final PortStatusDTO dto = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(inputPortId)); - return entityFactory.createPortStatusEntity(dto, permissions); - } - - @Override - public PortEntity getOutputPort(final String outputPortId) { - final Port port = outputPortDAO.getPort(outputPortId); - return createOutputPortEntity(port); - } - - @Override - public PortStatusEntity getOutputPortStatus(final String outputPortId) { - final Port outputPort = outputPortDAO.getPort(outputPortId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(outputPort); - final PortStatusDTO dto = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(outputPortId)); - return entityFactory.createPortStatusEntity(dto, permissions); - } - - @Override - public RemoteProcessGroupEntity getRemoteProcessGroup(final String remoteProcessGroupId) { - final RemoteProcessGroup rpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId); - return createRemoteGroupEntity(rpg, NiFiUserUtils.getNiFiUser()); - } - - @Override - public RemoteProcessGroupStatusEntity getRemoteProcessGroupStatus(final String id) { - final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(id); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup); - final RemoteProcessGroupStatusDTO dto = dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroup, controllerFacade.getRemoteProcessGroupStatus(id)); - return entityFactory.createRemoteProcessGroupStatusEntity(dto, permissions); - } - - @Override - public StatusHistoryEntity getRemoteProcessGroupStatusHistory(final String id) { - final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(id); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup); - final StatusHistoryDTO dto = controllerFacade.getRemoteProcessGroupStatusHistory(id); - return entityFactory.createStatusHistoryEntity(dto, permissions); - } - - @Override - public CurrentUserEntity getCurrentUser() { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final CurrentUserEntity entity = new CurrentUserEntity(); - entity.setIdentity(user.getIdentity()); - entity.setAnonymous(user.isAnonymous()); - entity.setProvenancePermissions(dtoFactory.createPermissionsDto(authorizableLookup.getProvenance())); - entity.setCountersPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getCounters())); - entity.setTenantsPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getTenant())); - entity.setControllerPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getController())); - entity.setPoliciesPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getPolicies())); - entity.setSystemPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getSystem())); - entity.setCanVersionFlows(CollectionUtils.isNotEmpty(flowRegistryClient.getRegistryIdentifiers())); - - entity.setRestrictedComponentsPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getRestrictedComponents())); - - final Set<ComponentRestrictionPermissionDTO> componentRestrictionPermissions = new HashSet<>(); - Arrays.stream(RequiredPermission.values()).forEach(requiredPermission -> { - final PermissionsDTO restrictionPermissions = dtoFactory.createPermissionsDto(authorizableLookup.getRestrictedComponents(requiredPermission)); - - final RequiredPermissionDTO requiredPermissionDto = new RequiredPermissionDTO(); - requiredPermissionDto.setId(requiredPermission.getPermissionIdentifier()); - requiredPermissionDto.setLabel(requiredPermission.getPermissionLabel()); - - final ComponentRestrictionPermissionDTO componentRestrictionPermissionDto = new ComponentRestrictionPermissionDTO(); - componentRestrictionPermissionDto.setRequiredPermission(requiredPermissionDto); - componentRestrictionPermissionDto.setPermissions(restrictionPermissions); - - componentRestrictionPermissions.add(componentRestrictionPermissionDto); - }); - entity.setComponentRestrictionPermissions(componentRestrictionPermissions); - - return entity; - } - - @Override - public ProcessGroupFlowEntity getProcessGroupFlow(final String groupId) { - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); - - // Get the Process Group Status but we only need a status depth of one because for any child process group, - // we ignore the status of each individual components. I.e., if Process Group A has child Group B, and child Group B - // has a Processor, we don't care about the individual stats of that Processor because the ProcessGroupFlowEntity - // doesn't include that anyway. So we can avoid including the information in the status that is returned. - final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId, 1); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); - return entityFactory.createProcessGroupFlowEntity(dtoFactory.createProcessGroupFlowDto(processGroup, groupStatus, revisionManager, this::getProcessGroupBulletins), permissions); - } - - @Override - public ProcessGroupEntity getProcessGroup(final String groupId) { - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); - return createProcessGroupEntity(processGroup); - } - - private ControllerServiceEntity createControllerServiceEntity(final ControllerServiceNode serviceNode, final Set<String> serviceIds) { - final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(serviceNode); - - final ControllerServiceReference ref = serviceNode.getReferences(); - final ControllerServiceReferencingComponentsEntity referencingComponentsEntity = createControllerServiceReferencingComponentsEntity(ref, serviceIds); - dto.setReferencingComponents(referencingComponentsEntity.getControllerServiceReferencingComponents()); - - final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(serviceNode.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(serviceNode, NiFiUserUtils.getNiFiUser()); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(serviceNode), NiFiUserUtils.getNiFiUser()); - final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(serviceNode.getIdentifier())); - final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createControllerServiceEntity(dto, revision, permissions, operatePermissions, bulletinEntities); - } - - @Override - public VariableRegistryEntity getVariableRegistry(final String groupId, final boolean includeAncestorGroups) { - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); - if (processGroup == null) { - throw new ResourceNotFoundException("Could not find group with ID " + groupId); - } - - return createVariableRegistryEntity(processGroup, includeAncestorGroups); - } - - private VariableRegistryEntity createVariableRegistryEntity(final ProcessGroup processGroup, final boolean includeAncestorGroups) { - final VariableRegistryDTO registryDto = dtoFactory.createVariableRegistryDto(processGroup, revisionManager); - final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processGroup.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); - - if (includeAncestorGroups) { - ProcessGroup parent = processGroup.getParent(); - while (parent != null) { - final PermissionsDTO parentPerms = dtoFactory.createPermissionsDto(parent); - if (Boolean.TRUE.equals(parentPerms.getCanRead())) { - final VariableRegistryDTO parentRegistryDto = dtoFactory.createVariableRegistryDto(parent, revisionManager); - final Set<VariableEntity> parentVariables = parentRegistryDto.getVariables(); - registryDto.getVariables().addAll(parentVariables); - } - - parent = parent.getParent(); - } - } - - return entityFactory.createVariableRegistryEntity(registryDto, revision, permissions); - } - - @Override - public VariableRegistryEntity populateAffectedComponents(final VariableRegistryDTO variableRegistryDto) { - final String groupId = variableRegistryDto.getProcessGroupId(); - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); - if (processGroup == null) { - throw new ResourceNotFoundException("Could not find group with ID " + groupId); - } - - final VariableRegistryDTO registryDto = dtoFactory.populateAffectedComponents(variableRegistryDto, processGroup, revisionManager); - final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processGroup.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); - return entityFactory.createVariableRegistryEntity(registryDto, revision, permissions); - } - - @Override - public Set<ControllerServiceEntity> getControllerServices(final String groupId, final boolean includeAncestorGroups, final boolean includeDescendantGroups) { - final Set<ControllerServiceNode> serviceNodes = controllerServiceDAO.getControllerServices(groupId, includeAncestorGroups, includeDescendantGroups); - final Set<String> serviceIds = serviceNodes.stream().map(service -> service.getIdentifier()).collect(Collectors.toSet()); - - return serviceNodes.stream() - .map(serviceNode -> createControllerServiceEntity(serviceNode, serviceIds)) - .collect(Collectors.toSet()); - } - - @Override - public ControllerServiceEntity getControllerService(final String controllerServiceId) { - final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId); - return createControllerServiceEntity(controllerService, Sets.newHashSet(controllerServiceId)); - } - - @Override - public PropertyDescriptorDTO getControllerServicePropertyDescriptor(final String id, final String property) { - final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(id); - PropertyDescriptor descriptor = controllerService.getControllerServiceImplementation().getPropertyDescriptor(property); - - // return an invalid descriptor if the controller service doesn't support this property - if (descriptor == null) { - descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build(); - } - - final String groupId = controllerService.getProcessGroup() == null ? null : controllerService.getProcessGroup().getIdentifier(); - return dtoFactory.createPropertyDescriptorDto(descriptor, groupId); - } - - @Override - public ControllerServiceReferencingComponentsEntity getControllerServiceReferencingComponents(final String controllerServiceId) { - final ControllerServiceNode service = controllerServiceDAO.getControllerService(controllerServiceId); - final ControllerServiceReference ref = service.getReferences(); - return createControllerServiceReferencingComponentsEntity(ref, Sets.newHashSet(controllerServiceId)); - } - - private ReportingTaskEntity createReportingTaskEntity(final ReportingTaskNode reportingTask) { - final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(reportingTask.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask); - final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask)); - final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier())); - final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createReportingTaskEntity(dtoFactory.createReportingTaskDto(reportingTask), revision, permissions, operatePermissions, bulletinEntities); - } - - @Override - public Set<ReportingTaskEntity> getReportingTasks() { - final Set<ReportingTaskNode> reportingTasks = reportingTaskDAO.getReportingTasks(); - return reportingTasks.stream() - .map(reportingTask -> createReportingTaskEntity(reportingTask)) - .collect(Collectors.toSet()); - } - - @Override - public ReportingTaskEntity getReportingTask(final String reportingTaskId) { - final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId); - return createReportingTaskEntity(reportingTask); - } - - @Override - public PropertyDescriptorDTO getReportingTaskPropertyDescriptor(final String id, final String property) { - final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(id); - PropertyDescriptor descriptor = reportingTask.getReportingTask().getPropertyDescriptor(property); - - // return an invalid descriptor if the reporting task doesn't support this property - if (descriptor == null) { - descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build(); - } - - return dtoFactory.createPropertyDescriptorDto(descriptor, null); - } - - @Override - public StatusHistoryEntity getProcessGroupStatusHistory(final String groupId) { - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); - final StatusHistoryDTO dto = controllerFacade.getProcessGroupStatusHistory(groupId); - return entityFactory.createStatusHistoryEntity(dto, permissions); - } - - @Override - public VersionControlComponentMappingEntity registerFlowWithFlowRegistry(final String groupId, final StartVersionControlRequestEntity requestEntity) { - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); - - final VersionControlInformation currentVci = processGroup.getVersionControlInformation(); - final int expectedVersion = currentVci == null ? 1 : currentVci.getVersion() + 1; - - // Create a VersionedProcessGroup snapshot of the flow as it is currently. - final InstantiatedVersionedProcessGroup versionedProcessGroup = createFlowSnapshot(groupId); - - final VersionedFlowDTO versionedFlowDto = requestEntity.getVersionedFlow(); - final String flowId = versionedFlowDto.getFlowId() == null ? UUID.randomUUID().toString() : versionedFlowDto.getFlowId(); - - final VersionedFlow versionedFlow = new VersionedFlow(); - versionedFlow.setBucketIdentifier(versionedFlowDto.getBucketId()); - versionedFlow.setCreatedTimestamp(System.currentTimeMillis()); - versionedFlow.setDescription(versionedFlowDto.getDescription()); - versionedFlow.setModifiedTimestamp(versionedFlow.getCreatedTimestamp()); - versionedFlow.setName(versionedFlowDto.getFlowName()); - versionedFlow.setIdentifier(flowId); - - // Add the Versioned Flow and first snapshot to the Flow Registry - final String registryId = requestEntity.getVersionedFlow().getRegistryId(); - final VersionedFlowSnapshot registeredSnapshot; - final VersionedFlow registeredFlow; - - String action = "create the flow"; - try { - // first, create the flow in the registry, if necessary - if (versionedFlowDto.getFlowId() == null) { - registeredFlow = registerVersionedFlow(registryId, versionedFlow); - } else { - registeredFlow = getVersionedFlow(registryId, versionedFlowDto.getBucketId(), versionedFlowDto.getFlowId()); - } - - action = "add the local flow to the Flow Registry as the first Snapshot"; - - // add first snapshot to the flow in the registry - registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, versionedFlowDto.getComments(), expectedVersion); - } catch (final NiFiRegistryException e) { - throw new IllegalArgumentException(e.getLocalizedMessage()); - } catch (final IOException ioe) { - throw new IllegalStateException("Failed to communicate with Flow Registry when attempting to " + action); - } - - final Bucket bucket = registeredSnapshot.getBucket(); - final VersionedFlow flow = registeredSnapshot.getFlow(); - - // Update the Process Group with the new VersionControlInformation. (Send this to all nodes). - final VersionControlInformationDTO vci = new VersionControlInformationDTO(); - vci.setBucketId(bucket.getIdentifier()); - vci.setBucketName(bucket.getName()); - vci.setFlowId(flow.getIdentifier()); - vci.setFlowName(flow.getName()); - vci.setFlowDescription(flow.getDescription()); - vci.setGroupId(groupId); - vci.setRegistryId(registryId); - vci.setRegistryName(getFlowRegistryName(registryId)); - vci.setVersion(registeredSnapshot.getSnapshotMetadata().getVersion()); - vci.setState(VersionedFlowState.UP_TO_DATE.name()); - - final Map<String, String> mapping = dtoFactory.createVersionControlComponentMappingDto(versionedProcessGroup); - - final Revision groupRevision = revisionManager.getRevision(groupId); - final RevisionDTO groupRevisionDto = dtoFactory.createRevisionDTO(groupRevision); - - final VersionControlComponentMappingEntity entity = new VersionControlComponentMappingEntity(); - entity.setVersionControlInformation(vci); - entity.setProcessGroupRevision(groupRevisionDto); - entity.setVersionControlComponentMapping(mapping); - return entity; - } - - @Override - public VersionedFlow deleteVersionedFlow(final String registryId, final String bucketId, final String flowId) { - final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId); - if (registry == null) { - throw new IllegalArgumentException("No Flow Registry exists with ID " + registryId); - } - - try { - return registry.deleteVersionedFlow(bucketId, flowId, NiFiUserUtils.getNiFiUser()); - } catch (final IOException | NiFiRegistryException e) { - throw new NiFiCoreException("Failed to remove flow from Flow Registry due to " + e.getMessage(), e); - } - } - - @Override - public VersionControlInformationEntity getVersionControlInformation(final String groupId) { - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); - final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation(); - if (versionControlInfo == null) { - return null; - } - - final VersionControlInformationDTO versionControlDto = dtoFactory.createVersionControlInformationDto(processGroup); - final RevisionDTO groupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(groupId)); - return entityFactory.createVersionControlInformationEntity(versionControlDto, groupRevision); - } - - private InstantiatedVersionedProcessGroup createFlowSnapshot(final String processGroupId) { - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId); - final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(controllerFacade.getExtensionManager()); - final InstantiatedVersionedProcessGroup versionedGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, false); - return versionedGroup; - } - - @Override - public FlowComparisonEntity getLocalModifications(final String processGroupId) { - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId); - final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation(); - if (versionControlInfo == null) { - throw new IllegalStateException("Process Group with ID " + processGroupId + " is not under Version Control"); - } - - final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryIdentifier()); - if (flowRegistry == null) { - throw new IllegalStateException("Process Group with ID " + processGroupId + " is tracking to a flow in Flow Registry with ID " + versionControlInfo.getRegistryIdentifier() - + " but cannot find a Flow Registry with that identifier"); - } - - final VersionedFlowSnapshot versionedFlowSnapshot; - try { - versionedFlowSnapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketIdentifier(), - versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), true, NiFiUserUtils.getNiFiUser()); - } catch (final IOException | NiFiRegistryException e) { - throw new NiFiCoreException("Failed to retrieve flow with Flow Registry in order to calculate local differences due to " + e.getMessage(), e); - } - - final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(controllerFacade.getExtensionManager()); - final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, true); - final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents(); - - final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup); - final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", registryGroup); - - final Set<String> ancestorServiceIds = getAncestorGroupServiceIds(processGroup); - final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, new ConciseEvolvingDifferenceDescriptor()); - final FlowComparison flowComparison = flowComparator.compare(); - - final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison); - - final FlowComparisonEntity entity = new FlowComparisonEntity(); - entity.setComponentDifferences(differenceDtos); - return entity; - } - - private Set<String> getAncestorGroupServiceIds(final ProcessGroup group) { - final Set<String> ancestorServiceIds; - ProcessGroup parentGroup = group.getParent(); - - if (parentGroup == null) { - ancestorServiceIds = Collections.emptySet(); - } else { - ancestorServiceIds = parentGroup.getControllerServices(true).stream() - .map(cs -> { - // We want to map the Controller Service to its Versioned Component ID, if it has one. - // If it does not have one, we want to generate it in the same way that our Flow Mapper does - // because this allows us to find the Controller Service when doing a Flow Diff. - final Optional<String> versionedId = cs.getVersionedComponentId(); - if (versionedId.isPresent()) { - return versionedId.get(); - } - - return UUID.nameUUIDFromBytes(cs.getIdentifier().getBytes(StandardCharsets.UTF_8)).toString(); - }) - .collect(Collectors.toSet()); - } - - return ancestorServiceIds; - } - - @Override - public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) { - final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId); - if (registry == null) { - throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId); - } - - try { - return registry.registerVersionedFlow(flow, NiFiUserUtils.getNiFiUser()); - } catch (final IOException | NiFiRegistryException e) { - throw new NiFiCoreException("Failed to register flow with Flow Registry due to " + e.getMessage(), e); - } - } - - private VersionedFlow getVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, NiFiRegistryException { - final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId); - if (registry == null) { - throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId); - } - - return registry.getVersionedFlow(bucketId, flowId, NiFiUserUtils.getNiFiUser()); - } - - @Override - public VersionedFlowSnapshot registerVersionedFlowSnapshot(final String registryId, final VersionedFlow flow, - final VersionedProcessGroup snapshot, final String comments, final int expectedVersion) { - final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId); - if (registry == null) { - throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId); - } - - try { - return registry.registerVersionedFlowSnapshot(flow, snapshot, comments, expectedVersion, NiFiUserUtils.getNiFiUser()); - } catch (final IOException | NiFiRegistryException e) { - throw new NiFiCoreException("Failed to register flow with Flow Registry due to " + e.getMessage(), e); - } - } - - @Override - public VersionControlInformationEntity setVersionControlInformation(final Revision revision, final String processGroupId, - final VersionControlInformationDTO versionControlInfo, final Map<String, String> versionedComponentMapping) { - - final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId); - - final RevisionUpdate<VersionControlInformationDTO> snapshot = updateComponent(revision, - group, - () -> processGroupDAO.updateVersionControlInformation(versionControlInfo, versionedComponentMapping), - processGroup -> dtoFactory.createVersionControlInformationDto(processGroup)); - - return entityFactory.createVersionControlInformationEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification())); - } - - @Override - public VersionControlInformationEntity deleteVersionControl(final Revision revision, final String processGroupId) { - final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId); - - final RevisionUpdate<VersionControlInformationDTO> snapshot = updateComponent(revision, - group, - () -> processGroupDAO.disconnectVersionControl(processGroupId), - processGroup -> dtoFactory.createVersionControlInformationDto(group)); - - return entityFactory.createVersionControlInformationEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification())); - } - - @Override - public void verifyCanUpdate(final String groupId, final VersionedFlowSnapshot proposedFlow, final boolean verifyConnectionRemoval, final boolean verifyNotDirty) { - final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); - group.verifyCanUpdate(proposedFlow, verifyConnectionRemoval, verifyNotDirty); - } - - @Override - public void verifyCanSaveToFlowRegistry(final String groupId, final String registryId, final String bucketId, final String flowId) { - final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); - group.verifyCanSaveToFlowRegistry(registryId, bucketId, flowId); - } - - @Override - public void verifyCanRevertLocalModifications(final String groupId, final VersionedFlowSnapshot versionedFlowSnapshot) { - final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); - group.verifyCanRevertLocalModifications(); - - // verify that the process group can be updated to the given snapshot. We do not verify that connections can - // be removed, because the flow may still be running, and it only matters that the connections can be removed once the components - // have been stopped. - group.verifyCanUpdate(versionedFlowSnapshot, false, false); - } - - @Override - public Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(final String processGroupId, final VersionedFlowSnapshot updatedSnapshot) { - final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId); - - final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(controllerFacade.getExtensionManager()); - final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, controllerFacade.getControllerServiceProvider(), flowRegistryClient, true); - - final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localContents); - final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Versioned Flow", updatedSnapshot.getFlowContents()); - - final Set<String> ancestorGroupServiceIds = getAncestorGroupServiceIds(group); - final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, ancestorGroupServiceIds, new StaticDifferenceDescriptor()); - final FlowComparison comparison = flowComparator.compare(); - - final Set<AffectedComponentEntity> affectedComponents = comparison.getDifferences().stream() - .filter(difference -> difference.getDifferenceType() != DifferenceType.COMPONENT_ADDED) // components that are added are not components that will be affected in the local flow. - .filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED) - .filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS) - .filter(FlowDifferenceFilters.FILTER_IGNORABLE_VERSIONED_FLOW_COORDINATE_CHANGES) - .map(difference -> { - final VersionedComponent localComponent = difference.getComponentA(); - - final String state; - switch (localComponent.getComponentType()) { - case CONTROLLER_SERVICE: - final String serviceId = ((InstantiatedVersionedControllerService) localComponent).getInstanceId(); - state = controllerServiceDAO.getControllerService(serviceId).getState().name(); - break; - case PROCESSOR: - final String processorId = ((InstantiatedVersionedProcessor) localComponent).getInstanceId(); - state = processorDAO.getProcessor(processorId).getPhysicalScheduledState().name(); - break; - case REMOTE_INPUT_PORT: - final InstantiatedVersionedRemoteGroupPort inputPort = (InstantiatedVersionedRemoteGroupPort) localComponent; - state = remoteProcessGroupDAO.getRemoteProcessGroup(inputPort.getInstanceGroupId()).getInputPort(inputPort.getInstanceId()).getScheduledState().name(); - break; - case REMOTE_OUTPUT_PORT: - final InstantiatedVersionedRemoteGroupPort outputPort = (InstantiatedVersionedRemoteGroupPort) localComponent; - state = remoteProcessGroupDAO.getRemoteProcessGroup(outputPort.getInstanceGroupId()).getOutputPort(outputPort.getInstanceId()).getScheduledState().name(); - break; - default: - state = null; - break; - } - - return createAffectedComponentEntity((InstantiatedVersionedComponent) localComponent, localComponent.getComponentType().name(), state); - }) - .collect(Collectors.toCollection(HashSet::new)); - - for (final FlowDifference difference : comparison.getDifferences()) { - // Ignore these as local differences for now because we can't do anything with it - if (difference.getDifferenceType() == DifferenceType.BUNDLE_CHANGED) { - continue; - } - - // Ignore differences for adding remote ports - if (FlowDifferenceFilters.isAddedOrRemovedRemotePort(difference)) { - continue; - } - - if (FlowDifferenceFilters.isIgnorableVersionedFlowCoordinateChange(difference)) { - continue; - } - - final VersionedComponent localComponent = difference.getComponentA(); - if (localComponent == null) { - continue; - } - - // If any Process Group is removed, consider all components below that Process Group as an affected component - if (difference.getDifferenceType() == DifferenceType.COMPONENT_REMOVED && localComponent.getComponentType() == org.apache.nifi.registry.flow.ComponentType.PROCESS_GROUP) { - final String localGroupId = ((InstantiatedVersionedProcessGroup) localComponent).getInstanceId(); - final ProcessGroup localGroup = processGroupDAO.getProcessGroup(localGroupId); - - localGroup.findAllProcessors().stream() - .map(comp -> createAffectedComponentEntity(comp)) - .forEach(affectedComponents::add); - localGroup.findAllFunnels().stream() - .map(comp -> createAffectedComponentEntity(comp)) - .forEach(affectedComponents::add); - localGroup.findAllInputPorts().stream() - .map(comp -> createAffectedComponentEntity(comp)) - .forEach(affectedComponents::add); - localGroup.findAllOutputPorts().stream() - .map(comp -> createAffectedComponentEntity(comp)) - .forEach(affectedComponents::add); - localGroup.findAllRemoteProcessGroups().stream() - .flatMap(rpg -> Stream.concat(rpg.getInputPorts().stream(), rpg.getOutputPorts().stream())) - .map(comp -> createAffectedComponentEntity(comp)) - .forEach(affectedComponents::add); - localGroup.findAllControllerServices().stream() - .map(comp -> createAffectedComponentEntity(comp)) - .forEach(affectedComponents::add); - } - - if (localComponent.getComponentType() == org.apache.nifi.registry.flow.ComponentType.CONTROLLER_SERVICE) { - final String serviceId = ((InstantiatedVersionedControllerService) localComponent).getInstanceId(); - final ControllerServiceNode serviceNode = controllerServiceDAO.getControllerService(serviceId); - - final List<ControllerServiceNode> referencingServices = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class); - for (final ControllerServiceNode referencingService : referencingServices) { - affectedComponents.add(createAffectedComponentEntity(referencingService)); - } - - final List<ProcessorNode> referencingProcessors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class); - for (final ProcessorNode referencingProcessor : referencingProcessors) { - affectedComponents.add(createAffectedComponentEntity(referencingProcessor)); - } - } - } - - // Create a map of all connectable components by versioned component ID to the connectable component itself - final Map<String, List<Connectable>> connectablesByVersionId = new HashMap<>(); - mapToConnectableId(group.findAllFunnels(), connectablesByVersionId); - mapToConnectableId(group.findAllInputPorts(), connectablesByVersionId); - mapToConnectableId(group.findAllOutputPorts(), connectablesByVersionId); - mapToConnectableId(group.findAllProcessors(), connectablesByVersionId); - - final List<RemoteGroupPort> remotePorts = new ArrayList<>(); - for (final RemoteProcessGroup rpg : group.findAllRemoteProcessGroups()) { - remotePorts.addAll(rpg.getInputPorts()); - remotePorts.addAll(rpg.getOutputPorts()); - } - mapToConnectableId(remotePorts, connectablesByVersionId); - - // If any connection is added or modified, we need to stop both the source (if it exists in the flow currently) - // and the destination (if it exists in the flow currently). - for (final FlowDifference difference : comparison.getDifferences()) { - VersionedComponent component = difference.getComponentA(); - if (component == null) { - component = difference.getComponentB(); - } - - if (component.getComponentType() != org.apache.nifi.registry.flow.ComponentType.CONNECTION) { - continue; - } - - final VersionedConnection connection = (VersionedConnection) component; - - final String sourceVersionedId = connection.getSource().getId(); - final List<Connectable> sources = connectablesByVersionId.get(sourceVersionedId); - if (sources != null) { - for (final Connectable source : sources) { - affectedComponents.add(createAffectedComponentEntity(source)); - } - } - - final String destinationVersionId = connection.getDestination().getId(); - final List<Connectable> destinations = connectablesByVersionId.get(destinationVersionId); - if (destinations != null) { - for (final Connectable destination : destinations) { - affectedComponents.add(createAffectedComponentEntity(destination)); - } - } - } - - return affectedComponents; - } - - private void mapToConnectableId(final Collection<? extends Connectable> connectables, final Map<String, List<Connectable>> destination) { - for (final Connectable connectable : connectables) { - final Optional<String> versionedIdOption = connectable.getVersionedComponentId(); - - // Determine the Versioned ID by using the ID that is assigned, if one is. Otherwise, - // we will calculate the Versioned ID. This allows us to map connectables that currently are not under - // version control. We have to do this so that if we are changing flow versions and have a component that is running and it does not exist - // in the Versioned Flow, we still need to be able to create an AffectedComponentDTO for it. - final String versionedId; - if (versionedIdOption.isPresent()) { - versionedId = versionedIdOption.get(); - } else { - versionedId = UUID.nameUUIDFromBytes(connectable.getIdentifier().getBytes(StandardCharsets.UTF_8)).toString(); - } - - final List<Connectable> byVersionedId = destination.computeIfAbsent(versionedId, key -> new ArrayList<>()); - byVersionedId.add(connectable); - } - } - - - private AffectedComponentEntity createAffectedComponentEntity(final Connectable connectable) { - final AffectedComponentEntity entity = new AffectedComponentEntity(); - entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(connectable.getIdentifier()))); - entity.setId(connectable.getIdentifier()); - - final Authorizable authorizable = getAuthorizable(connectable); - final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable); - entity.setPermissions(permissionsDto); - - final AffectedComponentDTO dto = new AffectedComponentDTO(); - dto.setId(connectable.getIdentifier()); - dto.setReferenceType(connectable.getConnectableType().name()); - dto.setState(connectable.getScheduledState().name()); - - final String groupId = connectable instanceof RemoteGroupPort ? ((RemoteGroupPort) connectable).getRemoteProcessGroup().getIdentifier() : connectable.getProcessGroupIdentifier(); - dto.setProcessGroupId(groupId); - - entity.setComponent(dto); - return entity; - } - - private AffectedComponentEntity createAffectedComponentEntity(final ControllerServiceNode serviceNode) { - final AffectedComponentEntity entity = new AffectedComponentEntity(); - entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(serviceNode.getIdentifier()))); - entity.setId(serviceNode.getIdentifier()); - - final Authorizable authorizable = authorizableLookup.getControllerService(serviceNode.getIdentifier()).getAuthorizable(); - final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable); - entity.setPermissions(permissionsDto); - - final AffectedComponentDTO dto = new AffectedComponentDTO(); - dto.setId(serviceNode.getIdentifier()); - dto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE); - dto.setProcessGroupId(serviceNode.getProcessGroupIdentifier()); - dto.setState(serviceNode.getState().name()); - - entity.setComponent(dto); - return entity; - } - - private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedVersionedComponent instance, final String componentTypeName, final String componentState) { - final AffectedComponentEntity entity = new AffectedComponentEntity(); - entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(instance.getInstanceId()))); - entity.setId(instance.getInstanceId()); - - final Authorizable authorizable = getAuthorizable(componentTypeName, instance); - final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable); - entity.setPermissions(permissionsDto); - - final AffectedComponentDTO dto = new AffectedComponentDTO(); - dto.setId(instance.getInstanceId()); - dto.setReferenceType(componentTypeName); - dto.setProcessGroupId(instance.getInstanceGroupId()); - dto.setState(componentState); - - entity.setComponent(dto); - return entity; - } - - - private Authorizable getAuthorizable(final Connectable connectable) { - switch (connectable.getConnectableType()) { - case REMOTE_INPUT_PORT: - case REMOTE_OUTPUT_PORT: - final String rpgId = ((RemoteGroupPort) connectable).getRemoteProcessGroup().getIdentifier(); - return authorizableLookup.getRemoteProcessGroup(rpgId); - default: - return authorizableLookup.getLocalConnectable(connectable.getIdentifier()); - } - } - - private Authorizable getAuthorizable(final String componentTypeName, final InstantiatedVersionedComponent versionedComponent) { - final String componentId = versionedComponent.getInstanceId(); - - if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.CONTROLLER_SERVICE.name())) { - return authorizableLookup.getControllerService(componentId).getAuthorizable(); - } - - if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.CONNECTION.name())) { - return authorizableLookup.getConnection(componentId).getAuthorizable(); - } - - if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.FUNNEL.name())) { - return authorizableLookup.getFunnel(componentId); - } - - if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.INPUT_PORT.name())) { - return authorizableLookup.getInputPort(componentId); - } - - if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.OUTPUT_PORT.name())) { - return authorizableLookup.getOutputPort(componentId); - } - - if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.LABEL.name())) { - return authorizableLookup.getLabel(componentId); - } - - if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.PROCESS_GROUP.name())) { - return authorizableLookup.getProcessGroup(componentId).getAuthorizable(); - } - - if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.PROCESSOR.name())) { - return authorizableLookup.getProcessor(componentId).getAuthorizable(); - } - - if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_INPUT_PORT.name())) { - return authorizableLookup.getRemoteProcessGroup(versionedComponent.getInstanceGroupId()); - } - - if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_OUTPUT_PORT.name())) { - return authorizableLookup.getRemoteProcessGroup(versionedComponent.getInstanceGroupId()); - } - - if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_PROCESS_GROUP.name())) { - return authorizableLookup.getRemoteProcessGroup(componentId); - } - - return null; - } - - @Override - public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo, final boolean fetchRemoteFlows) { - final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryId()); - if (flowRegistry == null) { - throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + versionControlInfo.getRegistryId()); - } - - final VersionedFlowSnapshot snapshot; - try { - snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion(), fetchRemoteFlows, NiFiUserUtils.getNiFiUser()); - } catch (final NiFiRegistryException | IOException e) { - logger.error(e.getMessage(), e); - throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket " - + versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion()); - } - - return snapshot; - } - - @Override - public String getFlowRegistryName(final String flowRegistryId) { - final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(flowRegistryId); - return flowRegistry == null ? flowRegistryId : flowRegistry.getName(); - } - - private List<Revision> getComponentRevisions(final ProcessGroup processGroup, final boolean includeGroupRevision) { - final List<Revision> revisions = new ArrayList<>(); - if (includeGroupRevision) { - revisions.add(revisionManager.getRevision(processGroup.getIdentifier())); - } - - processGroup.findAllConnections().stream() - .map(component -> revisionManager.getRevision(component.getIdentifier())) - .forEach(revisions::add); - processGroup.findAllControllerServices().stream() - .map(component -> revisionManager.getRevision(component.getIdentifier())) - .forEach(revisions::add); - processGroup.findAllFunnels().stream() - .map(component -> revisionManager.getRevision(component.getIdentifier())) - .forEach(revisions::add); - processGroup.findAllInputPorts().stream() - .map(component -> revisionManager.getRevision(component.getIdentifier())) - .forEach(revisions::add); - processGroup.findAllOutputPorts().stream() - .map(component -> revisionManager.getRevision(component.getIdentifier())) - .forEach(revisions::add); - processGroup.findAllLabels().stream() - .map(component -> revisionManager.getRevision(component.getIdentifier())) - .forEach(revisions::add); - processGroup.findAllProcessGroups().stream() - .map(component -> revisionManager.getRevision(component.getIdentifier())) - .forEach(revisions::add); - processGroup.findAllProcessors().stream() - .map(component -> revisionManager.getRevision(component.getIdentifier())) - .forEach(revisions::add); - processGroup.findAllRemoteProcessGroups().stream() - .map(component -> revisionManager.getRevision(component.getIdentifier())) - .forEach(revisions::add); - processGroup.findAllRemoteProcessGroups().stream() - .flatMap(rpg -> Stream.concat(rpg.getInputPorts().stream(), rpg.getOutputPorts().stream())) - .map(component -> revisionManager.getRevision(component.getIdentifier())) - .forEach(revisions::add); - - return revisions; - } - - @Override - public ProcessGroupEntity updateProcessGroupContents(final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo, - final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings, final boolean updateDescendantVersionedFlows) { - - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); - final List<Revision> revisions = getComponentRevisions(processGroup, false); - revisions.add(revision); - - final RevisionClaim revisionClaim = new StandardRevisionClaim(revisions); - - final RevisionUpdate<ProcessGroupDTO> revisionUpdate = revisionManager.updateRevision(revisionClaim, user, new UpdateRevisionTask<ProcessGroupDTO>() { - @Override - public RevisionUpdate<ProcessGroupDTO> update() { - // update the Process Group - processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows); - - // update the revisions - final Set<Revision> updatedRevisions = revisions.stream() - .map(rev -> revisionManager.getRevision(rev.getComponentId()).incrementRevision(revision.getClientId())) - .collect(Collectors.toSet()); - - // save - controllerFacade.save(); - - // gather details for response - final ProcessGroupDTO dto = dtoFactory.createProcessGroupDto(processGroup); - - final Revision updatedRevision = revisionManager.getRevision(groupId).incrementRevision(revision.getClientId()); - final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity()); - return new StandardRevisionUpdate<>(dto, lastModification, updatedRevisions); - } - }); - - final FlowModification lastModification = revisionUpdate.getLastModification(); - - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); - final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(lastModification); - final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroup.getIdentifier())); - final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroup.getIdentifier())); - final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createProcessGroupEntity(revisionUpdate.getComponent(), updatedRevision, permissions, status, bulletinEntities); - } - - private AuthorizationResult authorizeAction(final Action action) { - final String sourceId = action.getSourceId(); - final Component type = action.getSourceType(); - - Authorizable authorizable; - try { - switch (type) { - case Processor: - authorizable = authorizableLookup.getProcessor(sourceId).getAuthorizable(); - break; - case ReportingTask: - authorizable = authorizableLookup.getReportingTask(sourceId).getAuthorizable(); - break; - case ControllerService: - authorizable = authorizableLookup.getControllerService(sourceId).getAuthorizable(); - break; - case Controller: - authorizable = controllerFacade; - break; - case InputPort: - authorizable = authorizableLookup.getInputPort(sourceId); - break; - case OutputPort: - authorizable = authorizableLookup.getOutputPort(sourceId); - break; - case ProcessGroup: - authorizable = authorizableLookup.getProcessGroup(sourceId).getAuthorizable(); - break; - case RemoteProcessGroup: - authorizable = authorizableLookup.getRemoteProcessGroup(sourceId); - break; - case Funnel: - authorizable = authorizableLookup.getFunnel(sourceId); - break; - case Connection: - authorizable = authorizableLookup.getConnection(sourceId).getAuthorizable(); - break; - case AccessPolicy: - authorizable = authorizableLookup.getAccessPolicyById(sourceId); - break; - case User: - case UserGroup: - authorizable = authorizableLookup.getTenant(); - break; - default: - throw new WebApplicationException(Response.serverError().entity("An unexpected type of component is the source of this action.").build()); - } - } catch (final ResourceNotFoundException e) { - // if the underlying component is gone, use the controller to see if permissions should be allowed - authorizable = controllerFacade; - } - - // perform the authorization - return authorizable.checkAuthorization(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); - } - - @Override - public HistoryDTO getActions(final HistoryQueryDTO historyQueryDto) { - // extract the query criteria - final HistoryQuery historyQuery = new HistoryQuery(); - historyQuery.setStartDate(historyQueryDto.getStartDate()); - historyQuery.setEndDate(historyQueryDto.getEndDate()); - historyQuery.setSourceId(historyQueryDto.getSourceId()); - historyQuery.setUserIdentity(historyQueryDto.getUserIdentity()); - historyQuery.setOffset(historyQueryDto.getOffset()); - historyQuery.setCount(historyQueryDto.getCount()); - historyQuery.setSortColumn(historyQueryDto.getSortColumn()); - historyQuery.setSortOrder(historyQueryDto.getSortOrder()); - - // perform the query - final History history = auditService.getActions(historyQuery); - - // only retain authorized actions - final HistoryDTO historyDto = dtoFactory.createHistoryDto(history); - if (history.getActions() != null) { - final List<ActionEntity> actionEntities = new ArrayList<>(); - for (final Action action : history.getActions()) { - final AuthorizationResult result = authorizeAction(action); - actionEntities.add(entityFactory.createActionEntity(dtoFactory.createActionDto(action), Result.Approved.equals(result.getResult()))); - } - historyDto.setActions(actionEntities); - } - - // create the response - return historyDto; - } - - @Override - public ActionEntity getAction(final Integer actionId) { - // get the action - final Action action = auditService.getAction(actionId); - - // ensure the action was found - if (action == null) { - throw new ResourceNotFoundException(String.format("Unable to find action with id '%s'.", actionId)); - } - - final AuthorizationResult result = authorizeAction(action); - final boolean authorized = Result.Approved.equals(result.getResult()); - if (!authorized) { - throw new AccessDeniedException(result.getExplanation()); - } - - // return the action - return entityFactory.createActionEntity(dtoFactory.createActionDto(action), authorized); - } - - @Override - public ComponentHistoryDTO getComponentHistory(final String componentId) { - final Map<String, PropertyHistoryDTO> propertyHistoryDtos = new LinkedHashMap<>(); - final Map<String, List<PreviousValue>> propertyHistory = auditService.getPreviousValues(componentId); - - for (final Map.Entry<String, List<PreviousValue>> entry : propertyHistory.entrySet()) { - final List<PreviousValueDTO> previousValueDtos = new ArrayList<>(); - - for (final PreviousValue previousValue : entry.getValue()) { - final PreviousValueDTO dto = new PreviousValueDTO(); - dto.setPreviousValue(previousValue.getPreviousValue()); - dto.setTimestamp(previousValue.getTimestamp()); - dto.setUserIdentity(previousValue.getUserIdentity()); - previousValueDtos.add(dto); - } - - if (!previousValueDtos.isEmpty()) { - final PropertyHistoryDTO propertyHistoryDto = new PropertyHistoryDTO(); - propertyHistoryDto.setPreviousValues(previousValueDtos); - propertyHistoryDtos.put(entry.getKey(), propertyHistoryDto); - } - } - - final ComponentHistoryDTO history = new ComponentHistoryDTO(); - history.setComponentId(componentId); - history.setPropertyHistory(propertyHistoryDtos); - - return history; - } - - @Override - public ProcessorDiagnosticsEntity getProcessorDiagnostics(final String id) { - final ProcessorNode processor = processorDAO.getProcessor(id); - final ProcessorStatus processorStatus = controllerFacade.getProcessorStatus(id); - - // Generate Processor Diagnostics - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final ProcessorDiagnosticsDTO dto = controllerFacade.getProcessorDiagnostics(processor, processorStatus, bulletinRepository, serviceId -> { - final ControllerServiceNode serviceNode = controllerServiceDAO.getControllerService(serviceId); - return createControllerServiceEntity(serviceNode, Collections.emptySet()); - }); - - // Filter anything out of diagnostics that the user is not authorized to see. - final List<JVMDiagnosticsSnapshotDTO> jvmDiagnosticsSnaphots = new ArrayList<>(); - final JVMDiagnosticsDTO jvmDiagnostics = dto.getJvmDiagnostics(); - jvmDiagnosticsSnaphots.add(jvmDiagnostics.getAggregateSnapshot()); - - // filter controller-related information - final boolean canReadController = authorizableLookup.getController().isAuthorized(authorizer, RequestAction.READ, user); - if (!canReadController) { - for (final JVMDiagnosticsSnapshotDTO snapshot : jvmDiagnosticsSnaphots) { - snapshot.setControllerDiagnostics(null); - } - } - - // filter system diagnostics information - final boolean canReadSystem = authorizableLookup.getSystem().isAuthorized(authorizer, RequestAction.READ, user); - if (!canReadSystem) { - for (final JVMDiagnosticsSnapshotDTO snapshot : jvmDiagnosticsSnaphots) { - snapshot.setSystemDiagnosticsDto(null); - } - } - - final boolean canReadFlow = authorizableLookup.getFlow().isAuthorized(authorizer, RequestAction.READ, user); - if (!canReadFlow) { - for (final JVMDiagnosticsSnapshotDTO snapshot : jvmDiagnosticsSnaphots) { - snapshot.setFlowDiagnosticsDto(null); - } - } - - // filter connections - final Predicate<ConnectionDiagnosticsDTO> connectionAuthorized = connectionDiagnostics -> { - final String connectionId = connectionDiagnostics.getConnection().getId(); - return authorizableLookup.getConnection(connectionId).getAuthorizable().isAuthorized(authorizer, RequestAction.READ, user); - }; - - // Filter incoming connections by what user is authorized to READ - final Set<ConnectionDiagnosticsDTO> incoming = dto.getIncomingConnections(); - final Set<ConnectionDiagnosticsDTO> filteredIncoming = incoming.stream() - .filter(connectionAuthorized) - .collect(Collectors.toSet()); - - dto.setIncomingConnections(filteredIncoming); - - // Filter outgoing connections by what user is authorized to READ - final Set<ConnectionDiagnosticsDTO> outgoing = dto.getOutgoingConnections(); - final Set<ConnectionDiagnosticsDTO> filteredOutgoing = outgoing.stream() - .filter(connectionAuthorized) - .collect(Collectors.toSet()); - dto.setOutgoingConnections(filteredOutgoing); - - // Filter out any controller services that are referenced by the Processor - final Set<ControllerServiceDiagnosticsDTO> referencedServices = dto.getReferencedControllerServices(); - final Set<ControllerServiceDiagnosticsDTO> filteredReferencedServices = referencedServices.stream() - .filter(csDiagnostics -> { - final String csId = csDiagnostics.getControllerService().getId(); - return authorizableLookup.getControllerService(csId).getAuthorizable().isAuthorized(authorizer, RequestAction.READ, user); - }) - .map(csDiagnostics -> { - // Filter out any referencing components because those are generally not relevant from this context. - final ControllerServiceDTO serviceDto = csDiagnostics.getControllerService().getComponent(); - if (serviceDto != null) { - serviceDto.setReferencingComponents(null); - } - return csDiagnostics; - }) - .collect(Collectors.toSet()); - dto.setReferencedControllerServices(filteredReferencedServices); - - final Revision revision = revisionManager.getRevision(id); - final RevisionDTO revisionDto = dtoFactory.createRevisionDTO(revision); - final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(processor); - final List<BulletinEntity> bulletins = bulletinRepository.findBulletinsForSource(id).stream() - .map(bulletin -> dtoFactory.createBulletinDto(bulletin)) - .map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissionsDto.getCanRead())) - .collect(Collectors.toList()); - - final ProcessorStatusDTO processorStatusDto = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier())); - return entityFactory.createProcessorDiagnosticsEntity(dto, revisionDto, permissionsDto, processorStatusDto, bulletins); - } - - @Override - public boolean isClustered() { - return controllerFacade.isClustered(); - } - - @Override - public String getNodeId() { - final NodeIdentifier nodeId = controllerFacade.getNodeId(); - if (nodeId != null) { - return nodeId.getId(); - } else { - return null; - } - } - - @Override - public ClusterDTO getCluster() { - // create cluster summary dto - final ClusterDTO clusterDto = new ClusterDTO(); - - // set current time - clusterDto.setGenerated(new Date()); - - // create node dtos - final List<NodeDTO> nodeDtos = clusterCoordinator.getNodeIdentifiers().stream() - .map(nodeId -> getNode(nodeId)) - .collect(Collectors.toList()); - clusterDto.setNodes(nodeDtos); - - return clusterDto; - } - - @Override - public NodeDTO getNode(final String nodeId) { - final NodeIdentifier nodeIdentifier = clusterCoordinator.getNodeIdentifier(nodeId); - return getNode(nodeIdentifier); - } - - private NodeDTO getNode(final NodeIdentifier nodeId) { - final NodeConnectionStatus nodeStatus = clusterCoordinator.getConnectionStatus(nodeId); - final List<NodeEvent> events = clusterCoordinator.getNodeEvents(nodeId); - final Set<String> roles = getRoles(nodeId); - final NodeHeartbeat heartbeat = heartbeatMonitor.getLatestHeartbeat(nodeId); - return dtoFactory.createNodeDTO(nodeId, nodeStatus, heartbeat, events, roles); - } - - private Set<String> getRoles(final NodeIdentifier nodeId) { - final Set<String> roles = new HashSet<>(); - final String nodeAddress = nodeId.getSocketAddress() + ":" + nodeId.getSocketPort(); - - for (final String roleName : ClusterRoles.getAllRoles()) { - final String leader = leaderElectionManager.getLeader(roleName); - if (leader == null) { - continue; - } - - if (leader.equals(nodeAddress)) { - roles.add(roleName); - } - } - - return roles; - } - - @Override - public void deleteNode(final String nodeId) { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - if (user == null) { - throw new WebApplicationException(new Throwable("Unable to access details for current user.")); - } - - final String userDn = user.getIdentity(); - final NodeIdentifier nodeIdentifier = clusterCoordinator.getNodeIdentifier(nodeId); - if (nodeIdentifier == null) { - throw new UnknownNodeException("Cannot remove Node with ID " + nodeId + " because it is not part of the cluster"); - } - - final NodeConnectionStatus nodeConnectionStatus = clusterCoordinator.getConnectionStatus(nodeIdentifier); - if (!nodeConnectionStatus.getState().equals(NodeConnectionState.OFFLOADED) && !nodeConnectionStatus.getState().equals(NodeConnectionState.DISCONNECTED)) { - throw new IllegalNodeDeletionException("Cannot remove Node with ID " + nodeId + - " because it is not disconnected or offloaded, current state = " + nodeConnectionStatus.getState()); - } - - clusterCoordinator.removeNode(nodeIdentifier, userDn); - heartbeatMonitor.removeHeartbeat(nodeIdentifier); - } - - /* reusable function declarations for converting ids to tenant entities */ - private Function<String, TenantEntity> mapUserGroupIdToTenantEntity(final boolean enforceGroupExistence) { - return userGroupId -> { - final RevisionDTO userGroupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userGroupId)); - - final Group group; - if (enforceGroupExistence || userGroupDAO.hasUserGroup(userGroupId)) { - group = userGroupDAO.getUserGroup(userGroupId); - } else { - group = new Group.Builder().identifier(userGroupId).name("Group ID - " + userGroupId + " (removed externally)").build(); - } - - return entityFactory.createTenantEntity(dtoFactory.createTenantDTO(group), userGroupRevision, - dtoFactory.createPermissionsDto(authorizableLookup.getTenant())); - }; - } - - private Function<String, TenantEntity> mapUserIdToTenantEntity(final boolean enforceUserExistence) { - return userId -> { - final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userId)); - - final User user; - if (enforceUserExistence || userDAO.hasUser(userId)) { - user = userDAO.getUser(userId); - } else { - user = new User.Builder().identifier(userId).identity("User ID - " + userId + " (removed externally)").build(); - } - - return entityFactory.createTenantEntity(dtoFactory.createTenantDTO(user), userRevision, - dtoFactory.createPermissionsDto(authorizableLookup.getTenant())); - }; - } - - - /* setters */ - public void setProperties(final NiFiProperties properties) { - this.properties = properties; - } - - public void setControllerFacade(final ControllerFacade controllerFacade) { - this.controllerFacade = controllerFacade; - } - - public void setRemoteProcessGroupDAO(final RemoteProcessGroupDAO remoteProcessGroupDAO) { - this.remoteProcessGroupDAO = remoteProcessGroupDAO; - } - - public void setLabelDAO(final LabelDAO labelDAO) { - this.labelDAO = labelDAO; - } - - public void setFunnelDAO(final FunnelDAO funnelDAO) { - this.funnelDAO = funnelDAO; - } - - public void setSnippetDAO(final SnippetDAO snippetDAO) { - this.snippetDAO = snippetDAO; - } - - public void setProcessorDAO(final ProcessorDAO processorDAO) { - this.processorDAO = processorDAO; - } - - public void setConnectionDAO(final ConnectionDAO connectionDAO) { - this.connectionDAO = connectionDAO; - } - - public void setAuditService(final AuditService auditService) { - this.auditService = auditService; - } - - public void setRevisionManager(final RevisionManager revisionManager) { - this.revisionManager = revisionManager; - } - - public void setDtoFactory(final DtoFactory dtoFactory) { - this.dtoFactory = dtoFactory; - } - - public void setEntityFactory(final EntityFactory entityFactory) { - this.entityFactory = entityFactory; - } - - public void setInputPortDAO(final PortDAO inputPortDAO) { - this.inputPortDAO = inputPortDAO; - } - - public void setOutputPortDAO(final PortDAO outputPortDAO) { - this.outputPortDAO = outputPortDAO; - } - - public void setProcessGroupDAO(final ProcessGroupDAO processGroupDAO) { - this.processGroupDAO = processGroupDAO; - } - - public void setControllerServiceDAO(final ControllerServiceDAO controllerServiceDAO) { - this.controllerServiceDAO = controllerServiceDAO; - } - - public void setReportingTaskDAO(final ReportingTaskDAO reportingTaskDAO) { - this.reportingTaskDAO = reportingTaskDAO; - } - - public void setTemplateDAO(final TemplateDAO templateDAO) { - this.templateDAO = templateDAO; - } - - public void setSnippetUtils(final SnippetUtils snippetUtils) { - this.snippetUtils = snippetUtils; - } - - public void setAuthorizableLookup(final AuthorizableLookup authorizableLookup) { - this.authorizableLookup = authorizableLookup; - } - - public void setAuthorizer(final Authorizer authorizer) { - this.authorizer = authorizer; - } - - public void setUserDAO(final UserDAO userDAO) { - this.userDAO = userDAO; - } - - public void setUserGroupDAO(final UserGroupDAO userGroupDAO) { - this.userGroupDAO = userGroupDAO; - } - - public void setAccessPolicyDAO(final AccessPolicyDAO accessPolicyDAO) { - this.accessPolicyDAO = accessPolicyDAO; - } - - public void setClusterCoordinator(final ClusterCoordinator coordinator) { - this.clusterCoordinator = coordinator; - } - - public void setHeartbeatMonitor(final HeartbeatMonitor heartbeatMonitor) { - this.heartbeatMonitor = heartbeatMonitor; - } - - public void setBulletinRepository(final BulletinRepository bulletinRepository) { - this.bulletinRepository = bulletinRepository; - } - - public void setLeaderElectionManager(final LeaderElectionManager leaderElectionManager) { - this.leaderElectionManager = leaderElectionManager; - } - - public void setRegistryDAO(RegistryDAO registryDao) { - this.registryDAO = registryDao; - } - - public void setFlowRegistryClient(FlowRegistryClient flowRegistryClient) { - this.flowRegistryClient = flowRegistryClient; - } -} diff --git a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 2943e10..ad45f30 100644 --- a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -282,8 +282,7 @@ public final class DtoFactory { public FlowConfigurationDTO createFlowConfigurationDto(final String autoRefreshInterval, final Long defaultBackPressureObjectThreshold, - final String defaultBackPressureDataSizeThreshold, - final String dcaeDistributorApiHostname) { + final String defaultBackPressureDataSizeThreshold) { final FlowConfigurationDTO dto = new FlowConfigurationDTO(); // get the refresh interval @@ -293,9 +292,6 @@ public final class DtoFactory { dto.setSupportsConfigurableUsersAndGroups(AuthorizerCapabilityDetection.isConfigurableUserGroupProvider(authorizer)); dto.setSupportsConfigurableAuthorizer(AuthorizerCapabilityDetection.isConfigurableAccessPolicyProvider(authorizer)); - /* Renu - getting host IP */ - dto.setDcaeDistributorApiHostname(dcaeDistributorApiHostname); - final Date now = new Date(); dto.setTimeOffset(TimeZone.getDefault().getOffset(now.getTime())); dto.setCurrentTime(now); @@ -2042,8 +2038,8 @@ public final class DtoFactory { final RevisionDTO revision = createRevisionDTO(revisionManager.getRevision(connection.getIdentifier())); final PermissionsDTO accessPolicy = createPermissionsDto(connection); final ConnectionStatusDTO status = getComponentStatus( - () -> groupStatus.getConnectionStatus().stream().filter(connectionStatus -> connection.getIdentifier().equals(connectionStatus.getId())).findFirst().orElse(null), - connectionStatus -> createConnectionStatusDto(connectionStatus) + () -> groupStatus.getConnectionStatus().stream().filter(connectionStatus -> connection.getIdentifier().equals(connectionStatus.getId())).findFirst().orElse(null), + connectionStatus -> createConnectionStatusDto(connectionStatus) ); flow.getConnections().add(entityFactory.createConnectionEntity(dto, revision, accessPolicy, status)); } @@ -2067,8 +2063,8 @@ public final class DtoFactory { final PermissionsDTO permissions = createPermissionsDto(inputPort); final PermissionsDTO operatePermissions = createPermissionsDto(new OperationAuthorizable(inputPort)); final PortStatusDTO status = getComponentStatus( - () -> groupStatus.getInputPortStatus().stream().filter(inputPortStatus -> inputPort.getIdentifier().equals(inputPortStatus.getId())).findFirst().orElse(null), - inputPortStatus -> createPortStatusDto(inputPortStatus) + () -> groupStatus.getInputPortStatus().stream().filter(inputPortStatus -> inputPort.getIdentifier().equals(inputPortStatus.getId())).findFirst().orElse(null), + inputPortStatus -> createPortStatusDto(inputPortStatus) ); final List<BulletinDTO> bulletins = createBulletinDtos(bulletinRepository.findBulletinsForSource(inputPort.getIdentifier())); final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); @@ -2084,8 +2080,8 @@ public final class DtoFactory { final PermissionsDTO permissions = createPermissionsDto(outputPort); final PermissionsDTO operatePermissions = createPermissionsDto(new OperationAuthorizable(outputPort)); final PortStatusDTO status = getComponentStatus( - () -> groupStatus.getOutputPortStatus().stream().filter(outputPortStatus -> outputPort.getIdentifier().equals(outputPortStatus.getId())).findFirst().orElse(null), - outputPortStatus -> createPortStatusDto(outputPortStatus) + () -> groupStatus.getOutputPortStatus().stream().filter(outputPortStatus -> outputPort.getIdentifier().equals(outputPortStatus.getId())).findFirst().orElse(null), + outputPortStatus -> createPortStatusDto(outputPortStatus) ); final List<BulletinDTO> bulletins = createBulletinDtos(bulletinRepository.findBulletinsForSource(outputPort.getIdentifier())); final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); @@ -2110,8 +2106,8 @@ public final class DtoFactory { final RevisionDTO revision = createRevisionDTO(revisionManager.getRevision(processGroup.getIdentifier())); final PermissionsDTO permissions = createPermissionsDto(processGroup); final ProcessGroupStatusDTO status = getComponentStatus( - () -> groupStatus.getProcessGroupStatus().stream().filter(processGroupStatus -> processGroup.getIdentifier().equals(processGroupStatus.getId())).findFirst().orElse(null), - processGroupStatus -> createConciseProcessGroupStatusDto(processGroupStatus) + () -> groupStatus.getProcessGroupStatus().stream().filter(processGroupStatus -> processGroup.getIdentifier().equals(processGroupStatus.getId())).findFirst().orElse(null), + processGroupStatus -> createConciseProcessGroupStatusDto(processGroupStatus) ); final List<BulletinEntity> bulletins = getProcessGroupBulletins.apply(processGroup); flow.getProcessGroups().add(entityFactory.createProcessGroupEntity(dto, revision, permissions, status, bulletins)); @@ -2126,8 +2122,8 @@ public final class DtoFactory { final PermissionsDTO permissions = createPermissionsDto(processor); final PermissionsDTO operatePermissions = createPermissionsDto(new OperationAuthorizable(processor)); final ProcessorStatusDTO status = getComponentStatus( - () -> groupStatus.getProcessorStatus().stream().filter(processorStatus -> processor.getIdentifier().equals(processorStatus.getId())).findFirst().orElse(null), - processorStatus -> createProcessorStatusDto(processorStatus) + () -> groupStatus.getProcessorStatus().stream().filter(processorStatus -> processor.getIdentifier().equals(processorStatus.getId())).findFirst().orElse(null), + processorStatus -> createProcessorStatusDto(processorStatus) ); final List<BulletinDTO> bulletins = createBulletinDtos(bulletinRepository.findBulletinsForSource(processor.getIdentifier())); final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); @@ -2143,8 +2139,8 @@ public final class DtoFactory { final PermissionsDTO permissions = createPermissionsDto(remoteProcessGroup); final PermissionsDTO operatePermissions = createPermissionsDto(new OperationAuthorizable(remoteProcessGroup)); final RemoteProcessGroupStatusDTO status = getComponentStatus( - () -> groupStatus.getRemoteProcessGroupStatus().stream().filter(rpgStatus -> remoteProcessGroup.getIdentifier().equals(rpgStatus.getId())).findFirst().orElse(null), - remoteProcessGroupStatus -> createRemoteProcessGroupStatusDto(remoteProcessGroup, remoteProcessGroupStatus) + () -> groupStatus.getRemoteProcessGroupStatus().stream().filter(rpgStatus -> remoteProcessGroup.getIdentifier().equals(rpgStatus.getId())).findFirst().orElse(null), + remoteProcessGroupStatus -> createRemoteProcessGroupStatusDto(remoteProcessGroup, remoteProcessGroupStatus) ); final List<BulletinDTO> bulletins = createBulletinDtos(bulletinRepository.findBulletinsForSource(remoteProcessGroup.getIdentifier())); final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); @@ -2174,8 +2170,8 @@ public final class DtoFactory { final PermissionsDTO permissions = createPermissionsDto(procNode); final PermissionsDTO operatePermissions = createPermissionsDto(new OperationAuthorizable(procNode)); final ProcessorStatusDTO status = getComponentStatus( - () -> groupStatus.getProcessorStatus().stream().filter(processorStatus -> procNode.getIdentifier().equals(processorStatus.getId())).findFirst().orElse(null), - processorStatus -> createProcessorStatusDto(processorStatus) + () -> groupStatus.getProcessorStatus().stream().filter(processorStatus -> procNode.getIdentifier().equals(processorStatus.getId())).findFirst().orElse(null), + processorStatus -> createProcessorStatusDto(processorStatus) ); final List<BulletinDTO> bulletins = createBulletinDtos(bulletinRepository.findBulletinsForSource(procNode.getIdentifier())); final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); @@ -2186,8 +2182,8 @@ public final class DtoFactory { final RevisionDTO revision = createRevisionDTO(revisionManager.getRevision(connNode.getIdentifier())); final PermissionsDTO permissions = createPermissionsDto(connNode); final ConnectionStatusDTO status = getComponentStatus( - () -> groupStatus.getConnectionStatus().stream().filter(connectionStatus -> connNode.getIdentifier().equals(connectionStatus.getId())).findFirst().orElse(null), - connectionStatus -> createConnectionStatusDto(connectionStatus) + () -> groupStatus.getConnectionStatus().stream().filter(connectionStatus -> connNode.getIdentifier().equals(connectionStatus.getId())).findFirst().orElse(null), + connectionStatus -> createConnectionStatusDto(connectionStatus) ); dto.getConnections().add(entityFactory.createConnectionEntity(createConnectionDto(connNode), revision, permissions, status)); } @@ -2208,8 +2204,8 @@ public final class DtoFactory { final RevisionDTO revision = createRevisionDTO(revisionManager.getRevision(childGroup.getIdentifier())); final PermissionsDTO permissions = createPermissionsDto(childGroup); final ProcessGroupStatusDTO status = getComponentStatus( - () -> groupStatus.getProcessGroupStatus().stream().filter(processGroupStatus -> childGroup.getIdentifier().equals(processGroupStatus.getId())).findFirst().orElse(null), - processGroupStatus -> createConciseProcessGroupStatusDto(processGroupStatus) + () -> groupStatus.getProcessGroupStatus().stream().filter(processGroupStatus -> childGroup.getIdentifier().equals(processGroupStatus.getId())).findFirst().orElse(null), + processGroupStatus -> createConciseProcessGroupStatusDto(processGroupStatus) ); final List<BulletinEntity> bulletins = getProcessGroupBulletins.apply(childGroup); dto.getProcessGroups().add(entityFactory.createProcessGroupEntity(createProcessGroupDto(childGroup), revision, permissions, status, bulletins)); @@ -2220,8 +2216,8 @@ public final class DtoFactory { final PermissionsDTO permissions = createPermissionsDto(rpg); final PermissionsDTO operatePermissions = createPermissionsDto(new OperationAuthorizable(rpg)); final RemoteProcessGroupStatusDTO status = getComponentStatus( - () -> groupStatus.getRemoteProcessGroupStatus().stream().filter(remoteProcessGroupStatus -> rpg.getIdentifier().equals(remoteProcessGroupStatus.getId())).findFirst().orElse(null), - remoteProcessGroupStatus -> createRemoteProcessGroupStatusDto(rpg, remoteProcessGroupStatus) + () -> groupStatus.getRemoteProcessGroupStatus().stream().filter(remoteProcessGroupStatus -> rpg.getIdentifier().equals(remoteProcessGroupStatus.getId())).findFirst().orElse(null), + remoteProcessGroupStatus -> createRemoteProcessGroupStatusDto(rpg, remoteProcessGroupStatus) ); final List<BulletinDTO> bulletins = createBulletinDtos(bulletinRepository.findBulletinsForSource(rpg.getIdentifier())); final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); @@ -2233,8 +2229,8 @@ public final class DtoFactory { final PermissionsDTO permissions = createPermissionsDto(inputPort); final PermissionsDTO operatePermissions = createPermissionsDto(new OperationAuthorizable(inputPort)); final PortStatusDTO status = getComponentStatus( - () -> groupStatus.getInputPortStatus().stream().filter(inputPortStatus -> inputPort.getIdentifier().equals(inputPortStatus.getId())).findFirst().orElse(null), - inputPortStatus -> createPortStatusDto(inputPortStatus) + () -> groupStatus.getInputPortStatus().stream().filter(inputPortStatus -> inputPort.getIdentifier().equals(inputPortStatus.getId())).findFirst().orElse(null), + inputPortStatus -> createPortStatusDto(inputPortStatus) ); final List<BulletinDTO> bulletins = createBulletinDtos(bulletinRepository.findBulletinsForSource(inputPort.getIdentifier())); final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); @@ -2246,8 +2242,8 @@ public final class DtoFactory { final PermissionsDTO permissions = createPermissionsDto(outputPort); final PermissionsDTO operatePermissions = createPermissionsDto(new OperationAuthorizable(outputPort)); final PortStatusDTO status = getComponentStatus( - () -> groupStatus.getOutputPortStatus().stream().filter(outputPortStatus -> outputPort.getIdentifier().equals(outputPortStatus.getId())).findFirst().orElse(null), - outputPortStatus -> createPortStatusDto(outputPortStatus) + () -> groupStatus.getOutputPortStatus().stream().filter(outputPortStatus -> outputPort.getIdentifier().equals(outputPortStatus.getId())).findFirst().orElse(null), + outputPortStatus -> createPortStatusDto(outputPortStatus) ); final List<BulletinDTO> bulletins = createBulletinDtos(bulletinRepository.findBulletinsForSource(outputPort.getIdentifier())); final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); @@ -2290,7 +2286,7 @@ public final class DtoFactory { dto.setVersionControlInformation(createVersionControlInformationDto(group)); final Map<String, String> variables = group.getVariableRegistry().getVariableMap().entrySet().stream() - .collect(Collectors.toMap(entry -> entry.getKey().getName(), entry -> entry.getValue())); + .collect(Collectors.toMap(entry -> entry.getKey().getName(), entry -> entry.getValue())); dto.setVariables(variables); final ProcessGroup parentGroup = group.getParent(); @@ -2409,50 +2405,50 @@ public final class DtoFactory { mapping.put(group.getInstanceId(), group.getIdentifier()); group.getProcessors().stream() - .map(proc -> (InstantiatedVersionedProcessor) proc) - .forEach(proc -> mapping.put(proc.getInstanceId(), proc.getIdentifier())); + .map(proc -> (InstantiatedVersionedProcessor) proc) + .forEach(proc -> mapping.put(proc.getInstanceId(), proc.getIdentifier())); group.getFunnels().stream() - .map(funnel -> (InstantiatedVersionedFunnel) funnel) - .forEach(funnel -> mapping.put(funnel.getInstanceId(), funnel.getIdentifier())); + .map(funnel -> (InstantiatedVersionedFunnel) funnel) + .forEach(funnel -> mapping.put(funnel.getInstanceId(), funnel.getIdentifier())); group.getInputPorts().stream() - .map(port -> (InstantiatedVersionedPort) port) - .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier())); + .map(port -> (InstantiatedVersionedPort) port) + .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier())); group.getOutputPorts().stream() - .map(port -> (InstantiatedVersionedPort) port) - .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier())); + .map(port -> (InstantiatedVersionedPort) port) + .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier())); group.getControllerServices().stream() - .map(service -> (InstantiatedVersionedControllerService) service) - .forEach(service -> mapping.put(service.getInstanceId(), service.getIdentifier())); + .map(service -> (InstantiatedVersionedControllerService) service) + .forEach(service -> mapping.put(service.getInstanceId(), service.getIdentifier())); group.getLabels().stream() - .map(label -> (InstantiatedVersionedLabel) label) - .forEach(label -> mapping.put(label.getInstanceId(), label.getIdentifier())); + .map(label -> (InstantiatedVersionedLabel) label) + .forEach(label -> mapping.put(label.getInstanceId(), label.getIdentifier())); group.getConnections().stream() - .map(conn -> (InstantiatedVersionedConnection) conn) - .forEach(conn -> mapping.put(conn.getInstanceId(), conn.getIdentifier())); + .map(conn -> (InstantiatedVersionedConnection) conn) + .forEach(conn -> mapping.put(conn.getInstanceId(), conn.getIdentifier())); group.getRemoteProcessGroups().stream() - .map(rpg -> (InstantiatedVersionedRemoteProcessGroup) rpg) - .forEach(rpg -> { - mapping.put(rpg.getInstanceId(), rpg.getIdentifier()); - - if (rpg.getInputPorts() != null) { - rpg.getInputPorts().stream() - .map(port -> (InstantiatedVersionedRemoteGroupPort) port) - .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier())); - } + .map(rpg -> (InstantiatedVersionedRemoteProcessGroup) rpg) + .forEach(rpg -> { + mapping.put(rpg.getInstanceId(), rpg.getIdentifier()); + + if (rpg.getInputPorts() != null) { + rpg.getInputPorts().stream() + .map(port -> (InstantiatedVersionedRemoteGroupPort) port) + .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier())); + } - if (rpg.getOutputPorts() != null) { - rpg.getOutputPorts().stream() - .map(port -> (InstantiatedVersionedRemoteGroupPort) port) - .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier())); - } - }); + if (rpg.getOutputPorts() != null) { + rpg.getOutputPorts().stream() + .map(port -> (InstantiatedVersionedRemoteGroupPort) port) + .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier())); + } + }); group.getProcessGroups().stream() - .map(child -> (InstantiatedVersionedProcessGroup) child) - .forEach(child -> { - final Map<String, String> childMapping = createVersionControlComponentMappingDto(child); - mapping.putAll(childMapping); - }); + .map(child -> (InstantiatedVersionedProcessGroup) child) + .forEach(child -> { + final Map<String, String> childMapping = createVersionControlComponentMappingDto(child); + mapping.putAll(childMapping); + }); return mapping; } @@ -2574,8 +2570,8 @@ public final class DtoFactory { final ComponentVariableRegistry variableRegistry = processGroup.getVariableRegistry(); final List<String> variableNames = variableRegistry.getVariableMap().keySet().stream() - .map(descriptor -> descriptor.getName()) - .collect(Collectors.toList()); + .map(descriptor -> descriptor.getName()) + .collect(Collectors.toList()); final Set<VariableEntity> variableEntities = new LinkedHashSet<>(); @@ -3278,31 +3274,31 @@ public final class DtoFactory { * @return ProcessorDiagnosticsDTO for the given Processor */ public ProcessorDiagnosticsDTO createProcessorDiagnosticsDto(final ProcessorNode procNode, final ProcessorStatus procStatus, final BulletinRepository bulletinRepo, - final FlowController flowController, final Function<String, ControllerServiceEntity> serviceEntityFactory) { + final FlowController flowController, final Function<String, ControllerServiceEntity> serviceEntityFactory) { final ProcessorDiagnosticsDTO procDiagnostics = new ProcessorDiagnosticsDTO(); procDiagnostics.setClassLoaderDiagnostics(createClassLoaderDiagnosticsDto(procNode)); procDiagnostics.setIncomingConnections(procNode.getIncomingConnections().stream() - .map(this::createConnectionDiagnosticsDto) - .collect(Collectors.toSet())); + .map(this::createConnectionDiagnosticsDto) + .collect(Collectors.toSet())); procDiagnostics.setOutgoingConnections(procNode.getConnections().stream() - .map(this::createConnectionDiagnosticsDto) - .collect(Collectors.toSet())); + .map(this::createConnectionDiagnosticsDto) + .collect(Collectors.toSet())); procDiagnostics.setJvmDiagnostics(createJvmDiagnosticsDto(flowController)); procDiagnostics.setProcessor(createProcessorDto(procNode)); procDiagnostics.setProcessorStatus(createProcessorStatusDto(procStatus)); procDiagnostics.setThreadDumps(createThreadDumpDtos(procNode)); final Set<ControllerServiceDiagnosticsDTO> referencedServiceDiagnostics = createReferencedServiceDiagnostics(procNode.getProperties(), - flowController.getControllerServiceProvider(), serviceEntityFactory); + flowController.getControllerServiceProvider(), serviceEntityFactory); procDiagnostics.setReferencedControllerServices(referencedServiceDiagnostics); return procDiagnostics; } private Set<ControllerServiceDiagnosticsDTO> createReferencedServiceDiagnostics(final Map<PropertyDescriptor, String> properties, final ControllerServiceProvider serviceProvider, - final Function<String, ControllerServiceEntity> serviceEntityFactory) { + final Function<String, ControllerServiceEntity> serviceEntityFactory) { final Set<ControllerServiceDiagnosticsDTO> referencedServiceDiagnostics = new HashSet<>(); for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) { @@ -3339,7 +3335,7 @@ public final class DtoFactory { * @return ControllerServiceDiagnosticsDTO for the given Controller Service */ public ControllerServiceDiagnosticsDTO createControllerServiceDiagnosticsDto(final ControllerServiceNode serviceNode, final Function<String, ControllerServiceEntity> serviceEntityFactory, - final ControllerServiceProvider serviceProvider) { + final ControllerServiceProvider serviceProvider) { final ControllerServiceDiagnosticsDTO serviceDiagnostics = new ControllerServiceDiagnosticsDTO(); final ControllerServiceEntity serviceEntity = serviceEntityFactory.apply(serviceNode.getIdentifier()); @@ -3409,8 +3405,8 @@ public final class DtoFactory { final List<RemoteQueuePartitionDiagnostics> remoteDiagnostics = queueDiagnostics.getRemoteQueuePartitionDiagnostics(); if (remoteDiagnostics != null) { final List<RemoteQueuePartitionDTO> remoteDiagnosticsDtos = remoteDiagnostics.stream() - .map(this::createRemoteQueuePartitionDto) - .collect(Collectors.toList()); + .map(this::createRemoteQueuePartitionDto) + .collect(Collectors.toList()); dto.setRemoteQueuePartitions(remoteDiagnosticsDtos); } @@ -3490,10 +3486,10 @@ public final class DtoFactory { // flow-related information final Set<BundleDTO> bundlesLoaded = extensionManager.getAllBundles().stream() - .map(bundle -> bundle.getBundleDetails().getCoordinate()) - .sorted((a, b) -> a.getCoordinate().compareTo(b.getCoordinate())) - .map(this::createBundleDto) - .collect(Collectors.toCollection(LinkedHashSet::new)); + .map(bundle -> bundle.getBundleDetails().getCoordinate()) + .sorted((a, b) -> a.getCoordinate().compareTo(b.getCoordinate())) + .map(this::createBundleDto) + .collect(Collectors.toCollection(LinkedHashSet::new)); flowDiagnosticsDto.setActiveEventDrivenThreads(flowController.getActiveEventDrivenThreadCount()); flowDiagnosticsDto.setActiveTimerDrivenThreads(flowController.getActiveTimerDrivenThreadCount()); diff --git a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/api/dto/FlowConfigurationDTO.java b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/api/dto/FlowConfigurationDTO.java deleted file mode 100644 index 2dd91ad..0000000 --- a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/api/dto/FlowConfigurationDTO.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Modifications to the original nifi code for the ONAP project are made - * available under the Apache License, Version 2.0 - */ -package org.apache.nifi.web.api.dto; - -import io.swagger.annotations.ApiModelProperty; -import org.apache.nifi.web.api.dto.util.TimeAdapter; - -import javax.xml.bind.annotation.XmlType; -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; -import java.util.Date; - -/** - * Details for the controller configuration. - */ -@XmlType(name = "flowConfiguration") -public class FlowConfigurationDTO { - - private Boolean supportsManagedAuthorizer; - private Boolean supportsConfigurableAuthorizer; - private Boolean supportsConfigurableUsersAndGroups; - private Long autoRefreshIntervalSeconds; - - private Date currentTime; - private Integer timeOffset; - - private Long defaultBackPressureObjectThreshold; - private String defaultBackPressureDataSizeThreshold; - - private String dcaeDistributorApiHostname; - - /** - * @author Renu - * @return getter and setter for dcae distributor api hostname. This value is read only - */ - @ApiModelProperty( - value = "Whether it picks up configurable host address.", - readOnly = true - ) - public String getDcaeDistributorApiHostname() { - return dcaeDistributorApiHostname; - } - - public void setDcaeDistributorApiHostname(String dcaeDistributorApiHostname) { - this.dcaeDistributorApiHostname = dcaeDistributorApiHostname; - } - - /** - * @return interval in seconds between the automatic NiFi refresh requests. This value is read only - */ - @ApiModelProperty( - value = "The interval in seconds between the automatic NiFi refresh requests.", - readOnly = true - ) - public Long getAutoRefreshIntervalSeconds() { - return autoRefreshIntervalSeconds; - } - - public void setAutoRefreshIntervalSeconds(Long autoRefreshIntervalSeconds) { - this.autoRefreshIntervalSeconds = autoRefreshIntervalSeconds; - } - - /** - * @return whether this NiFi supports a managed authorizer. Managed authorizers can visualize users, groups, - * and policies in the UI. This value is read only - */ - @ApiModelProperty( - value = "Whether this NiFi supports a managed authorizer. Managed authorizers can visualize users, groups, and policies in the UI.", - readOnly = true - ) - public Boolean getSupportsManagedAuthorizer() { - return supportsManagedAuthorizer; - } - - public void setSupportsManagedAuthorizer(Boolean supportsManagedAuthorizer) { - this.supportsManagedAuthorizer = supportsManagedAuthorizer; - } - - /** - * @return whether this NiFi supports configurable users and groups. This value is read only - */ - @ApiModelProperty( - value = "Whether this NiFi supports configurable users and groups.", - readOnly = true - ) - public Boolean getSupportsConfigurableUsersAndGroups() { - return supportsConfigurableUsersAndGroups; - } - - public void setSupportsConfigurableUsersAndGroups(Boolean supportsConfigurableUsersAndGroups) { - this.supportsConfigurableUsersAndGroups = supportsConfigurableUsersAndGroups; - } - - /** - * @return whether this NiFi supports a configurable authorizer. This value is read only - */ - @ApiModelProperty( - value = "Whether this NiFi supports a configurable authorizer.", - readOnly = true - ) - public Boolean getSupportsConfigurableAuthorizer() { - return supportsConfigurableAuthorizer; - } - - public void setSupportsConfigurableAuthorizer(Boolean supportsConfigurableAuthorizer) { - this.supportsConfigurableAuthorizer = supportsConfigurableAuthorizer; - } - - /** - * @return current time on the server - */ - @XmlJavaTypeAdapter(TimeAdapter.class) - @ApiModelProperty( - value = "The current time on the system.", - dataType = "string" - ) - public Date getCurrentTime() { - return currentTime; - } - - public void setCurrentTime(Date currentTime) { - this.currentTime = currentTime; - } - - /** - * @return time offset of the server - */ - @ApiModelProperty( - value = "The time offset of the system." - ) - public Integer getTimeOffset() { - return timeOffset; - } - - public void setTimeOffset(Integer timeOffset) { - this.timeOffset = timeOffset; - } - - /** - * @return the default back pressure object threshold - */ - @ApiModelProperty( - value = "The default back pressure object threshold." - ) - public Long getDefaultBackPressureObjectThreshold() { - return defaultBackPressureObjectThreshold; - } - - public void setDefaultBackPressureObjectThreshold(Long backPressureObjectThreshold) { - this.defaultBackPressureObjectThreshold = backPressureObjectThreshold; - } - - /** - * @return the default back pressure data size threshold - */ - @ApiModelProperty( - value = "The default back pressure data size threshold." - ) - public String getDefaultBackPressureDataSizeThreshold() { - return defaultBackPressureDataSizeThreshold; - } - - public void setDefaultBackPressureDataSizeThreshold(String backPressureDataSizeThreshold) { - this.defaultBackPressureDataSizeThreshold = backPressureDataSizeThreshold; - } -} |