From 9507f2f8d2ec616f01f5ee8825106300b95e8ddc Mon Sep 17 00:00:00 2001 From: Andrew Gauld Date: Fri, 7 Feb 2020 15:00:39 +0000 Subject: Add DCAE MOD design tool project Change-Id: I660b28ebfaa7e4b5f03a1df5fd17d126f58b7c14 Issue-ID: DCAEGEN2-1860 Signed-off-by: Andrew Gauld --- .../src/main/java/org/apache/nifi/NiFi.java | 446 ++ .../org/apache/nifi/controller/AbstractPort.java | 675 +++ .../java/org/apache/nifi/nar/DCAEAutoLoader.java | 105 + .../java/org/apache/nifi/nar/DCAEClassLoaders.java | 127 + .../java/org/apache/nifi/util/NiFiProperties.java | 1551 +++++++ .../apache/nifi/web/StandardNiFiServiceFacade.java | 4899 ++++++++++++++++++++ .../org/apache/nifi/web/api/dto/DtoFactory.java | 4354 +++++++++++++++++ .../nifi/web/api/dto/FlowConfigurationDTO.java | 182 + .../nifi/web/dao/impl/StandardConnectionDAO.java | 700 +++ .../org/apache/nifi/web/server/JettyServer.java | 1226 +++++ 10 files changed, 14265 insertions(+) create mode 100644 mod/designtool/designtool-web/src/main/java/org/apache/nifi/NiFi.java create mode 100644 mod/designtool/designtool-web/src/main/java/org/apache/nifi/controller/AbstractPort.java create mode 100644 mod/designtool/designtool-web/src/main/java/org/apache/nifi/nar/DCAEAutoLoader.java create mode 100644 mod/designtool/designtool-web/src/main/java/org/apache/nifi/nar/DCAEClassLoaders.java create mode 100644 mod/designtool/designtool-web/src/main/java/org/apache/nifi/util/NiFiProperties.java create mode 100644 mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java create mode 100644 mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java create mode 100644 mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/api/dto/FlowConfigurationDTO.java create mode 100644 mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java create mode 100644 mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/server/JettyServer.java (limited to 'mod/designtool/designtool-web/src/main/java/org') diff --git a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/NiFi.java b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/NiFi.java new file mode 100644 index 0000000..0b033db --- /dev/null +++ b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/NiFi.java @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modifications to the original nifi code for the ONAP project are made + * available under the Apache License, Version 2.0 + */ +package org.apache.nifi; + +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.nar.ExtensionMapping; +import org.apache.nifi.nar.NarClassLoaders; +import org.apache.nifi.nar.NarClassLoadersHolder; +import org.apache.nifi.nar.NarUnpacker; +import org.apache.nifi.nar.SystemBundle; +import org.apache.nifi.util.FileUtils; +import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.bridge.SLF4JBridgeHandler; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.Random; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class NiFi { + + private static final Logger LOGGER = LoggerFactory.getLogger(NiFi.class); + private static final String KEY_FILE_FLAG = "-K"; + private final NiFiServer nifiServer; + private final BootstrapListener bootstrapListener; + + public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port"; + private volatile boolean shutdown = false; + + public NiFi(final NiFiProperties properties) + throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { + + this(properties, ClassLoader.getSystemClassLoader()); + + } + + public NiFi(final NiFiProperties properties, ClassLoader rootClassLoader) + throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { + + // There can only be one krb5.conf for the overall Java process so set this globally during + // start up so that processors and our Kerberos authentication code don't have to set this + final File kerberosConfigFile = properties.getKerberosConfigurationFile(); + if (kerberosConfigFile != null) { + final String kerberosConfigFilePath = kerberosConfigFile.getAbsolutePath(); + LOGGER.info("Setting java.security.krb5.conf to {}", new Object[]{kerberosConfigFilePath}); + System.setProperty("java.security.krb5.conf", kerberosConfigFilePath); + } + + setDefaultUncaughtExceptionHandler(); + + // register the shutdown hook + addShutdownHook(); + + final String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY); + if (bootstrapPort != null) { + try { + final int port = Integer.parseInt(bootstrapPort); + + if (port < 1 || port > 65535) { + throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535"); + } + + bootstrapListener = new BootstrapListener(this, port); + bootstrapListener.start(); + } catch (final NumberFormatException nfe) { + throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535"); + } + } else { + LOGGER.info("NiFi started without Bootstrap Port information provided; will not listen for requests from Bootstrap"); + bootstrapListener = null; + } + + // delete the web working dir - if the application does not start successfully + // the web app directories might be in an invalid state. when this happens + // jetty will not attempt to re-extract the war into the directory. by removing + // the working directory, we can be assured that it will attempt to extract the + // war every time the application starts. + File webWorkingDir = properties.getWebWorkingDirectory(); + FileUtils.deleteFilesInDirectory(webWorkingDir, null, LOGGER, true, true); + FileUtils.deleteFile(webWorkingDir, LOGGER, 3); + + detectTimingIssues(); + + // redirect JUL log events + initLogging(); + + final Bundle systemBundle = SystemBundle.create(properties); + + // expand the nars + final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties, systemBundle); + + // load the extensions classloaders + NarClassLoaders narClassLoaders = NarClassLoadersHolder.getInstance(); + + narClassLoaders.init(rootClassLoader, + properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory()); + + // load the framework classloader + final ClassLoader frameworkClassLoader = narClassLoaders.getFrameworkBundle().getClassLoader(); + if (frameworkClassLoader == null) { + throw new IllegalStateException("Unable to find the framework NAR ClassLoader."); + } + + final Set narBundles = narClassLoaders.getBundles(); + + // load the server from the framework classloader + Thread.currentThread().setContextClassLoader(frameworkClassLoader); + Class jettyServer = Class.forName("org.apache.nifi.web.server.JettyServer", true, frameworkClassLoader); + Constructor jettyConstructor = jettyServer.getConstructor(NiFiProperties.class, Set.class); + + final long startTime = System.nanoTime(); + nifiServer = (NiFiServer) jettyConstructor.newInstance(properties, narBundles); + nifiServer.setExtensionMapping(extensionMapping); + nifiServer.setBundles(systemBundle, narBundles); + + if (shutdown) { + LOGGER.info("NiFi has been shutdown via NiFi Bootstrap. Will not start Controller"); + } else { + nifiServer.start(); + + if (bootstrapListener != null) { + bootstrapListener.sendStartedStatus(true); + } + + final long duration = System.nanoTime() - startTime; + LOGGER.info("Controller initialization took " + duration + " nanoseconds " + + "(" + (int) TimeUnit.SECONDS.convert(duration, TimeUnit.NANOSECONDS) + " seconds)."); + } + } + + protected void setDefaultUncaughtExceptionHandler() { + Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { + @Override + public void uncaughtException(final Thread t, final Throwable e) { + LOGGER.error("An Unknown Error Occurred in Thread {}: {}", t, e.toString()); + LOGGER.error("", e); + } + }); + } + + protected void addShutdownHook() { + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + // shutdown the jetty server + shutdownHook(); + } + })); + } + + protected void initLogging() { + SLF4JBridgeHandler.removeHandlersForRootLogger(); + SLF4JBridgeHandler.install(); + } + + private static ClassLoader createBootstrapClassLoader() { + //Get list of files in bootstrap folder + final List urls = new ArrayList<>(); + try { + Files.list(Paths.get("lib/bootstrap")).forEach(p -> { + try { + urls.add(p.toUri().toURL()); + } catch (final MalformedURLException mef) { + LOGGER.warn("Unable to load " + p.getFileName() + " due to " + mef, mef); + } + }); + } catch (IOException ioe) { + LOGGER.warn("Unable to access lib/bootstrap to create bootstrap classloader", ioe); + } + //Create the bootstrap classloader + return new URLClassLoader(urls.toArray(new URL[0]), Thread.currentThread().getContextClassLoader()); + } + + protected void shutdownHook() { + try { + shutdown(); + } catch (final Throwable t) { + LOGGER.warn("Problem occurred ensuring Jetty web server was properly terminated due to " + t); + } + } + + protected void shutdown() { + this.shutdown = true; + + LOGGER.info("Initiating shutdown of Jetty web server..."); + if (nifiServer != null) { + nifiServer.stop(); + } + if (bootstrapListener != null) { + bootstrapListener.stop(); + } + LOGGER.info("Jetty web server shutdown completed (nicely or otherwise)."); + } + + /** + * Determine if the machine we're running on has timing issues. + */ + private void detectTimingIssues() { + final int minRequiredOccurrences = 25; + final int maxOccurrencesOutOfRange = 15; + final AtomicLong lastTriggerMillis = new AtomicLong(System.currentTimeMillis()); + + final ScheduledExecutorService service = Executors.newScheduledThreadPool(1, new ThreadFactory() { + private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); + + @Override + public Thread newThread(final Runnable r) { + final Thread t = defaultFactory.newThread(r); + t.setDaemon(true); + t.setName("Detect Timing Issues"); + return t; + } + }); + + final AtomicInteger occurrencesOutOfRange = new AtomicInteger(0); + final AtomicInteger occurrences = new AtomicInteger(0); + final Runnable command = new Runnable() { + @Override + public void run() { + final long curMillis = System.currentTimeMillis(); + final long difference = curMillis - lastTriggerMillis.get(); + final long millisOff = Math.abs(difference - 2000L); + occurrences.incrementAndGet(); + if (millisOff > 500L) { + occurrencesOutOfRange.incrementAndGet(); + } + lastTriggerMillis.set(curMillis); + } + }; + + final ScheduledFuture future = service.scheduleWithFixedDelay(command, 2000L, 2000L, TimeUnit.MILLISECONDS); + + final TimerTask timerTask = new TimerTask() { + @Override + public void run() { + future.cancel(true); + service.shutdownNow(); + + if (occurrences.get() < minRequiredOccurrences || occurrencesOutOfRange.get() > maxOccurrencesOutOfRange) { + LOGGER.warn("NiFi has detected that this box is not responding within the expected timing interval, which may cause " + + "Processors to be scheduled erratically. Please see the NiFi documentation for more information."); + } + } + }; + final Timer timer = new Timer(true); + timer.schedule(timerTask, 60000L); + } + + /** + * Main entry point of the application. + * + * @param args things which are ignored + */ + public static void main(String[] args) { + LOGGER.info("Launching NiFi..."); + try { + NiFiProperties properties = convertArgumentsToValidatedNiFiProperties(args); + new NiFi(properties); + } catch (final Throwable t) { + LOGGER.error("Failure to launch NiFi due to " + t, t); + } + } + + protected static NiFiProperties convertArgumentsToValidatedNiFiProperties(String[] args) { + final ClassLoader bootstrap = createBootstrapClassLoader(); + NiFiProperties properties = initializeProperties(args, bootstrap); + properties.validate(); + return properties; + } + + private static NiFiProperties initializeProperties(final String[] args, final ClassLoader boostrapLoader) { + // Try to get key + // If key doesn't exist, instantiate without + // Load properties + // If properties are protected and key missing, throw RuntimeException + + final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + final String key; + try { + key = loadFormattedKey(args); + // The key might be empty or null when it is passed to the loader + } catch (IllegalArgumentException e) { + final String msg = "The bootstrap process did not provide a valid key"; + throw new IllegalArgumentException(msg, e); + } + Thread.currentThread().setContextClassLoader(boostrapLoader); + + try { + final Class propsLoaderClass = Class.forName("org.apache.nifi.properties.NiFiPropertiesLoader", true, boostrapLoader); + final Method withKeyMethod = propsLoaderClass.getMethod("withKey", String.class); + final Object loaderInstance = withKeyMethod.invoke(null, key); + final Method getMethod = propsLoaderClass.getMethod("get"); + final NiFiProperties properties = (NiFiProperties) getMethod.invoke(loaderInstance); + LOGGER.info("Loaded {} properties", properties.size()); + return properties; + } catch (InvocationTargetException wrappedException) { + final String msg = "There was an issue decrypting protected properties"; + throw new IllegalArgumentException(msg, wrappedException.getCause() == null ? wrappedException : wrappedException.getCause()); + } catch (final IllegalAccessException | NoSuchMethodException | ClassNotFoundException reex) { + final String msg = "Unable to access properties loader in the expected manner - apparent classpath or build issue"; + throw new IllegalArgumentException(msg, reex); + } catch (final RuntimeException e) { + final String msg = "There was an issue decrypting protected properties"; + throw new IllegalArgumentException(msg, e); + } finally { + Thread.currentThread().setContextClassLoader(contextClassLoader); + } + } + + private static String loadFormattedKey(String[] args) { + String key = null; + List parsedArgs = parseArgs(args); + // Check if args contain protection key + if (parsedArgs.contains(KEY_FILE_FLAG)) { + key = getKeyFromKeyFileAndPrune(parsedArgs); + // Format the key (check hex validity and remove spaces) + key = formatHexKey(key); + + } + + if (null == key) { + return ""; + } else if (!isHexKeyValid(key)) { + throw new IllegalArgumentException("The key was not provided in valid hex format and of the correct length"); + } else { + return key; + } + } + + private static String getKeyFromKeyFileAndPrune(List parsedArgs) { + String key = null; + LOGGER.debug("The bootstrap process provided the " + KEY_FILE_FLAG + " flag"); + int i = parsedArgs.indexOf(KEY_FILE_FLAG); + if (parsedArgs.size() <= i + 1) { + LOGGER.error("The bootstrap process passed the {} flag without a filename", KEY_FILE_FLAG); + throw new IllegalArgumentException("The bootstrap process provided the " + KEY_FILE_FLAG + " flag but no key"); + } + try { + String passwordfile_path = parsedArgs.get(i + 1); + // Slurp in the contents of the file: + byte[] encoded = Files.readAllBytes(Paths.get(passwordfile_path)); + key = new String(encoded,StandardCharsets.UTF_8); + if (0 == key.length()) + throw new IllegalArgumentException("Key in keyfile " + passwordfile_path + " yielded an empty key"); + + LOGGER.info("Now overwriting file in "+passwordfile_path); + + // Overwrite the contents of the file (to avoid littering file system + // unlinked with key material): + File password_file = new File(passwordfile_path); + FileWriter overwriter = new FileWriter(password_file,false); + + // Construe a random pad: + Random r = new Random(); + StringBuffer sb = new StringBuffer(); + // Note on correctness: this pad is longer, but equally sufficient. + while(sb.length() < encoded.length){ + sb.append(Integer.toHexString(r.nextInt())); + } + String pad = sb.toString(); + LOGGER.info("Overwriting key material with pad: "+pad); + overwriter.write(pad); + overwriter.close(); + + LOGGER.info("Removing/unlinking file: "+passwordfile_path); + password_file.delete(); + + } catch (IOException e) { + LOGGER.error("Caught IOException while retrieving the "+KEY_FILE_FLAG+"-passed keyfile; aborting: "+e.toString()); + System.exit(1); + } + + LOGGER.info("Read property protection key from key file provided by bootstrap process"); + return key; + } + + private static List parseArgs(String[] args) { + List parsedArgs = new ArrayList<>(Arrays.asList(args)); + for (int i = 0; i < parsedArgs.size(); i++) { + if (parsedArgs.get(i).startsWith(KEY_FILE_FLAG + " ")) { + String[] split = parsedArgs.get(i).split(" ", 2); + parsedArgs.set(i, split[0]); + parsedArgs.add(i + 1, split[1]); + break; + } + } + return parsedArgs; + } + + private static String formatHexKey(String input) { + if (input == null || input.trim().isEmpty()) { + return ""; + } + return input.replaceAll("[^0-9a-fA-F]", "").toLowerCase(); + } + + private static boolean isHexKeyValid(String key) { + if (key == null || key.trim().isEmpty()) { + return false; + } + // Key length is in "nibbles" (i.e. one hex char = 4 bits) + return Arrays.asList(128, 196, 256).contains(key.length() * 4) && key.matches("^[0-9a-fA-F]*$"); + } +} diff --git a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/controller/AbstractPort.java b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/controller/AbstractPort.java new file mode 100644 index 0000000..6023fc2 --- /dev/null +++ b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/controller/AbstractPort.java @@ -0,0 +1,675 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modifications to the original nifi code for the ONAP project are made + * available under the Apache License, Version 2.0 + */ +package org.apache.nifi.controller; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.nifi.authorization.Resource; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.authorization.resource.ResourceFactory; +import org.apache.nifi.authorization.resource.ResourceType; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.ConnectableType; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.connectable.Port; +import org.apache.nifi.connectable.Position; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.CharacterFilterUtils; +import org.apache.nifi.util.FormatUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static java.util.Objects.requireNonNull; + +public abstract class AbstractPort implements Port { + + public static final Relationship PORT_RELATIONSHIP = new Relationship.Builder() + .description("The relationship through which all Flow Files are transferred") + .name("") + .build(); + + public static final long MINIMUM_PENALIZATION_MILLIS = 0L; + public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; + + public static final long MINIMUM_YIELD_MILLIS = 0L; + public static final long DEFAULT_YIELD_PERIOD = 10000L; + public static final TimeUnit DEFAULT_YIELD_TIME_UNIT = TimeUnit.MILLISECONDS; + + private final List relationships; + + private final String id; + private final ConnectableType type; + private final AtomicReference name; + private final AtomicReference position; + private final AtomicReference comments; + private final AtomicReference processGroup; + private final AtomicBoolean lossTolerant; + private final AtomicReference scheduledState; + private final AtomicInteger concurrentTaskCount; + private final AtomicReference penalizationPeriod; + private final AtomicReference yieldPeriod; + private final AtomicReference schedulingPeriod; + private final AtomicReference versionedComponentId = new AtomicReference<>(); + private final AtomicLong schedulingNanos; + private final AtomicLong yieldExpiration; + private final ProcessScheduler processScheduler; + + private final Set outgoingConnections; + private final List incomingConnections; + + private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = rwLock.readLock(); + private final Lock writeLock = rwLock.writeLock(); + + public AbstractPort(final String id, final String name, final ProcessGroup processGroup, final ConnectableType type, final ProcessScheduler scheduler) { + this.id = requireNonNull(id); + this.name = new AtomicReference<>(requireNonNull(name)); + position = new AtomicReference<>(new Position(0D, 0D)); + outgoingConnections = new HashSet<>(); + incomingConnections = new ArrayList<>(); + comments = new AtomicReference<>(); + lossTolerant = new AtomicBoolean(false); + concurrentTaskCount = new AtomicInteger(1); + processScheduler = scheduler; + + final List relationshipList = new ArrayList<>(); + relationshipList.add(PORT_RELATIONSHIP); + relationships = Collections.unmodifiableList(relationshipList); + this.processGroup = new AtomicReference<>(processGroup); + this.type = type; + penalizationPeriod = new AtomicReference<>("30 sec"); + yieldPeriod = new AtomicReference<>("1 sec"); + yieldExpiration = new AtomicLong(0L); + schedulingPeriod = new AtomicReference<>("0 millis"); + schedulingNanos = new AtomicLong(MINIMUM_SCHEDULING_NANOS); + scheduledState = new AtomicReference<>(ScheduledState.STOPPED); + } + + @Override + public String getIdentifier() { + return id; + } + + @Override + public String getProcessGroupIdentifier() { + final ProcessGroup procGroup = getProcessGroup(); + return procGroup == null ? null : procGroup.getIdentifier(); + } + + @Override + public String getName() { + return name.get(); + } + + @Override + public void setName(final String name) { + if (this.name.get().equals(name)) { + return; + } + + final ProcessGroup parentGroup = this.processGroup.get(); + if (getConnectableType() == ConnectableType.INPUT_PORT) { + if (parentGroup.getInputPortByName(name) != null) { + throw new IllegalStateException("The requested new port name is not available"); + } + } else if (getConnectableType() == ConnectableType.OUTPUT_PORT) { + if (parentGroup.getOutputPortByName(name) != null) { + throw new IllegalStateException("The requested new port name is not available"); + } + } + + this.name.set(name); + } + + @Override + public Authorizable getParentAuthorizable() { + return getProcessGroup(); + } + + @Override + public Resource getResource() { + final ResourceType resourceType = ConnectableType.INPUT_PORT.equals(getConnectableType()) ? ResourceType.InputPort : ResourceType.OutputPort; + return ResourceFactory.getComponentResource(resourceType, getIdentifier(), getName()); + } + + @Override + public ProcessGroup getProcessGroup() { + return processGroup.get(); + } + + @Override + public void setProcessGroup(final ProcessGroup newGroup) { + this.processGroup.set(newGroup); + } + + @Override + public String getComments() { + return comments.get(); + } + + @Override + public void setComments(final String comments) { + this.comments.set(CharacterFilterUtils.filterInvalidXmlCharacters(comments)); + } + + @Override + public Collection getRelationships() { + return relationships; + } + + @Override + public Relationship getRelationship(final String relationshipName) { + if (PORT_RELATIONSHIP.getName().equals(relationshipName)) { + return PORT_RELATIONSHIP; + } + return null; + } + + @Override + public void addConnection(final Connection connection) throws IllegalArgumentException { + writeLock.lock(); + try { + if (!requireNonNull(connection).getSource().equals(this)) { + if (connection.getDestination().equals(this)) { + // don't add the connection twice. This may occur if we have a self-loop because we will be told + // to add the connection once because we are the source and again because we are the destination. + if (!incomingConnections.contains(connection)) { + incomingConnections.add(connection); + } + + return; + } else { + throw new IllegalArgumentException("Cannot add a connection to a LocalPort for which the LocalPort is neither the Source nor the Destination"); + } + } + + /* TODO: Will commenting this out have repercussions? + Needed to comment this out to allow use of relationships for port to processor case which was previously not supported + for (final Relationship relationship : connection.getRelationships()) { + if (!relationship.equals(PORT_RELATIONSHIP)) { + throw new IllegalArgumentException("No relationship with name " + relationship + " exists for Local Ports"); + } + } + */ + + // don't add the connection twice. This may occur if we have a self-loop because we will be told + // to add the connection once because we are the source and again because we are the destination. + if (!outgoingConnections.contains(connection)) { + outgoingConnections.add(connection); + } + } finally { + writeLock.unlock(); + } + } + + @Override + public boolean hasIncomingConnection() { + readLock.lock(); + try { + return !incomingConnections.isEmpty(); + } finally { + readLock.unlock(); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + final ProcessSession session = sessionFactory.createSession(); + + try { + onTrigger(context, session); + session.commit(); + } catch (final ProcessException e) { + session.rollback(); + throw e; + } catch (final Throwable t) { + session.rollback(); + throw new RuntimeException(t); + } + } + + public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException; + + @Override + public void updateConnection(final Connection connection) throws IllegalStateException { + if (requireNonNull(connection).getSource().equals(this)) { + writeLock.lock(); + try { + if (!outgoingConnections.remove(connection)) { + throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port"); + } + outgoingConnections.add(connection); + } finally { + writeLock.unlock(); + } + } else if (connection.getDestination().equals(this)) { + writeLock.lock(); + try { + if (!incomingConnections.remove(connection)) { + throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port"); + } + incomingConnections.add(connection); + } finally { + writeLock.unlock(); + } + } else { + throw new IllegalStateException("The given connection is not currently registered for this Port"); + } + } + + @Override + public void removeConnection(final Connection connection) throws IllegalArgumentException, IllegalStateException { + writeLock.lock(); + try { + if (!requireNonNull(connection).getSource().equals(this)) { + final boolean existed = incomingConnections.remove(connection); + if (!existed) { + throw new IllegalStateException("The given connection is not currently registered for this Port"); + } + return; + } + + if (!canConnectionBeRemoved(connection)) { + // TODO: Determine which processors will be broken if connection is removed, rather than just returning a boolean + throw new IllegalStateException("Connection " + connection.getIdentifier() + " cannot be removed"); + } + + final boolean removed = outgoingConnections.remove(connection); + if (!removed) { + throw new IllegalStateException("Connection " + connection.getIdentifier() + " is not registered with " + this.getIdentifier()); + } + } finally { + writeLock.unlock(); + } + } + + /** + * Verify that removing this connection will not prevent this Port from + * still being connected via each relationship + * + * @param connection to test for removal + * @return true if can be removed + */ + private boolean canConnectionBeRemoved(final Connection connection) { + final Connectable source = connection.getSource(); + if (!source.isRunning()) { + // we don't have to verify that this Connectable is still connected because it's okay to make + // the source invalid since it is not running. + return true; + } + + for (final Relationship relationship : source.getRelationships()) { + if (source.isAutoTerminated(relationship)) { + continue; + } + + final Set connectionsForRelationship = source.getConnections(relationship); + if (connectionsForRelationship == null || connectionsForRelationship.isEmpty()) { + return false; + } + } + + return true; + } + + @Override + public Set getConnections() { + readLock.lock(); + try { + return Collections.unmodifiableSet(outgoingConnections); + } finally { + readLock.unlock(); + } + } + + @Override + public Set getConnections(final Relationship relationship) { + readLock.lock(); + try { + if (relationship.equals(PORT_RELATIONSHIP)) { + return Collections.unmodifiableSet(outgoingConnections); + } + + throw new IllegalArgumentException("No relationship with name " + relationship.getName() + " exists for Local Ports"); + } finally { + readLock.unlock(); + } + } + + @Override + public Position getPosition() { + return position.get(); + } + + @Override + public void setPosition(final Position position) { + this.position.set(position); + } + + @Override + public String toString() { + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", getIdentifier()).toString(); + } + + @Override + public List getIncomingConnections() { + readLock.lock(); + try { + return Collections.unmodifiableList(incomingConnections); + } finally { + readLock.unlock(); + } + } + + @Override + public abstract boolean isValid(); + + @Override + public boolean isAutoTerminated(final Relationship relationship) { + return false; + } + + @Override + public boolean isLossTolerant() { + return lossTolerant.get(); + } + + @Override + public void setLossTolerant(boolean lossTolerant) { + this.lossTolerant.set(lossTolerant); + } + + @Override + public void setMaxConcurrentTasks(final int taskCount) { + if (taskCount < 1) { + throw new IllegalArgumentException(); + } + concurrentTaskCount.set(taskCount); + } + + @Override + public int getMaxConcurrentTasks() { + return concurrentTaskCount.get(); + } + + @Override + public void shutdown() { + scheduledState.set(ScheduledState.STOPPED); + } + + @Override + public void onSchedulingStart() { + scheduledState.set(ScheduledState.RUNNING); + } + + public void disable() { + final boolean updated = scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED); + if (!updated) { + throw new IllegalStateException("Port cannot be disabled because it is not stopped"); + } + } + + public void enable() { + final boolean updated = scheduledState.compareAndSet(ScheduledState.DISABLED, ScheduledState.STOPPED); + if (!updated) { + throw new IllegalStateException("Port cannot be enabled because it is not disabled"); + } + } + + @Override + public boolean isRunning() { + return getScheduledState().equals(ScheduledState.RUNNING) || processScheduler.getActiveThreadCount(this) > 0; + } + + @Override + public ScheduledState getScheduledState() { + return scheduledState.get(); + } + + @Override + public ConnectableType getConnectableType() { + return type; + } + + @Override + public void setYieldPeriod(final String yieldPeriod) { + final long yieldMillis = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS); + if (yieldMillis < 0) { + throw new IllegalArgumentException("Yield duration must be positive"); + } + this.yieldPeriod.set(yieldPeriod); + } + + @Override + public void setScheduldingPeriod(final String schedulingPeriod) { + final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), TimeUnit.NANOSECONDS); + if (schedulingNanos < 0) { + throw new IllegalArgumentException("Scheduling Period must be positive"); + } + + this.schedulingPeriod.set(schedulingPeriod); + this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos)); + } + + @Override + public long getPenalizationPeriod(final TimeUnit timeUnit) { + return FormatUtils.getTimeDuration(getPenalizationPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); + } + + @Override + public String getPenalizationPeriod() { + return penalizationPeriod.get(); + } + + @Override + public void yield() { + final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS); + yield(yieldMillis, TimeUnit.MILLISECONDS); + } + + @Override + public void yield(final long yieldDuration, final TimeUnit timeUnit) { + final long yieldMillis = timeUnit.toMillis(yieldDuration); + yieldExpiration.set(Math.max(yieldExpiration.get(), System.currentTimeMillis() + yieldMillis)); + } + + @Override + public long getYieldExpiration() { + return yieldExpiration.get(); + } + + @Override + public long getSchedulingPeriod(final TimeUnit timeUnit) { + return timeUnit.convert(schedulingNanos.get(), TimeUnit.NANOSECONDS); + } + + @Override + public String getSchedulingPeriod() { + return schedulingPeriod.get(); + } + + @Override + public void setPenalizationPeriod(final String penalizationPeriod) { + this.penalizationPeriod.set(penalizationPeriod); + } + + @Override + public String getYieldPeriod() { + return yieldPeriod.get(); + } + + @Override + public long getYieldPeriod(final TimeUnit timeUnit) { + return FormatUtils.getTimeDuration(getYieldPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); + } + + @Override + public void verifyCanDelete() throws IllegalStateException { + verifyCanDelete(false); + } + + @Override + public void verifyCanDelete(final boolean ignoreConnections) { + readLock.lock(); + try { + if (isRunning()) { + throw new IllegalStateException(this.getIdentifier() + " is running"); + } + + if (!ignoreConnections) { + for (final Connection connection : outgoingConnections) { + connection.verifyCanDelete(); + } + + for (final Connection connection : incomingConnections) { + if (connection.getSource().equals(this)) { + connection.verifyCanDelete(); + } else { + throw new IllegalStateException(this.getIdentifier() + " is the destination of another component"); + } + } + } + } finally { + readLock.unlock(); + } + } + + @Override + public void verifyCanStart() { + readLock.lock(); + try { + switch (scheduledState.get()) { + case DISABLED: + throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is disabled"); + case RUNNING: + throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is already running"); + case STOPPED: + break; + } + verifyNoActiveThreads(); + + final Collection validationResults = getValidationErrors(); + if (!validationResults.isEmpty()) { + throw new IllegalStateException(this.getIdentifier() + " is not in a valid state: " + validationResults.iterator().next().getExplanation()); + } + } finally { + readLock.unlock(); + } + } + + @Override + public void verifyCanStop() { + if (getScheduledState() != ScheduledState.RUNNING) { + throw new IllegalStateException(this.getIdentifier() + " is not scheduled to run"); + } + } + + @Override + public void verifyCanUpdate() { + readLock.lock(); + try { + if (isRunning()) { + throw new IllegalStateException(this.getIdentifier() + " is not stopped"); + } + } finally { + readLock.unlock(); + } + } + + @Override + public void verifyCanEnable() { + readLock.lock(); + try { + if (getScheduledState() != ScheduledState.DISABLED) { + throw new IllegalStateException(this.getIdentifier() + " is not disabled"); + } + + verifyNoActiveThreads(); + } finally { + readLock.unlock(); + } + } + + @Override + public void verifyCanDisable() { + readLock.lock(); + try { + if (getScheduledState() != ScheduledState.STOPPED) { + throw new IllegalStateException(this.getIdentifier() + " is not stopped"); + } + verifyNoActiveThreads(); + } finally { + readLock.unlock(); + } + } + + private void verifyNoActiveThreads() throws IllegalStateException { + final int threadCount = processScheduler.getActiveThreadCount(this); + if (threadCount > 0) { + throw new IllegalStateException(this.getIdentifier() + " has " + threadCount + " threads still active"); + } + } + + @Override + public void verifyCanClearState() { + } + + @Override + public Optional getVersionedComponentId() { + return Optional.ofNullable(versionedComponentId.get()); + } + + @Override + public void setVersionedComponentId(final String versionedComponentId) { + boolean updated = false; + while (!updated) { + final String currentId = this.versionedComponentId.get(); + + if (currentId == null) { + updated = this.versionedComponentId.compareAndSet(null, versionedComponentId); + } else if (currentId.equals(versionedComponentId)) { + return; + } else if (versionedComponentId == null) { + updated = this.versionedComponentId.compareAndSet(currentId, null); + } else { + throw new IllegalStateException(this + " is already under version control"); + } + } + } +} diff --git a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/nar/DCAEAutoLoader.java b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/nar/DCAEAutoLoader.java new file mode 100644 index 0000000..ec15ba6 --- /dev/null +++ b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/nar/DCAEAutoLoader.java @@ -0,0 +1,105 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.apache.nifi.nar; + +import org.apache.nifi.bundle.Bundle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.net.URL; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.Executors; + +/** + * Uses the Java executor service scheduler to continuously load new DCAE jars + */ +public class DCAEAutoLoader { + + private static final Logger LOGGER = LoggerFactory.getLogger(DCAEAutoLoader.class); + + private static final long POLL_INTERVAL_MS = 5000; + + /** + * Runnable task that grabs list of remotely stored jars, identifies ones that haven't + * been processed, builds Nifi bundles for those unprocessed ones and loads them into + * the global extension manager. + */ + private static class LoaderTask implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(LoaderTask.class); + + private final URI indexJsonDcaeJars; + private final ExtensionDiscoveringManager extensionManager; + private final Set processed = new LinkedHashSet(); + + private LoaderTask(URI indexJsonDcaeJars, ExtensionDiscoveringManager extensionManager) { + this.indexJsonDcaeJars = indexJsonDcaeJars; + this.extensionManager = extensionManager; + } + + @Override + public void run() { + try { + List toProcess = DCAEClassLoaders.getDCAEJarsURLs(this.indexJsonDcaeJars); + toProcess.removeAll(processed); + + if (!toProcess.isEmpty()) { + Set bundles = DCAEClassLoaders.createDCAEBundles(toProcess); + this.extensionManager.discoverExtensions(bundles); + processed.addAll(toProcess); + + LOGGER.info(String.format("#Added DCAE bundles: %d, #Total DCAE bundles: %d ", + bundles.size(), processed.size())); + } + } catch (final Exception e) { + LOGGER.error("Error loading DCAE jars due to: " + e.getMessage(), e); + } + } + } + + private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + private ScheduledFuture taskFuture; + + public synchronized void start(URI indexJsonDcaeJars, final ExtensionDiscoveringManager extensionManager) { + // Restricting to a single thread + if (taskFuture != null && !taskFuture.isCancelled()) { + return; + } + + LOGGER.info("Starting DCAE Auto-Loader: {}", new Object[]{indexJsonDcaeJars}); + + LoaderTask task = new LoaderTask(indexJsonDcaeJars, extensionManager); + this.taskFuture = executor.scheduleAtFixedRate(task, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); + LOGGER.info("DCAE Auto-Loader started"); + } + + public synchronized void stop() { + if (this.taskFuture != null) { + this.taskFuture.cancel(true); + LOGGER.info("DCAE Auto-Loader stopped"); + } + } + +} diff --git a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/nar/DCAEClassLoaders.java b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/nar/DCAEClassLoaders.java new file mode 100644 index 0000000..a4dbe77 --- /dev/null +++ b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/nar/DCAEClassLoaders.java @@ -0,0 +1,127 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.apache.nifi.nar; + +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.bundle.BundleDetails; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.jar.Manifest; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.util.jar.Attributes; + + +/** + * Class responsible for loading JARs for DCAEProcessors into Nifi + */ +public class DCAEClassLoaders { + + public static class DCAEClassLoadersError extends RuntimeException { + public DCAEClassLoadersError(Throwable e) { + super("Error while using DCAEClassLoaders", e); + } + } + + /** + * Given a URL to a index.json file, fetches the file and generates a list of + * URLs for DCAE jars that has Processors packaged. + * + * @param indexDCAEJars + * @return + */ + public static List getDCAEJarsURLs(URI indexDCAEJars) { + JsonFactory jf = new JsonFactory(); + ObjectMapper om = new ObjectMapper(); + + try { + List urls = om.readValue(jf.createParser(indexDCAEJars.toURL()), List.class); + + return urls.stream().map(u -> { + try { + Map foo = (Map) u; + String name = (String) foo.get("name"); + String url = String.format("%s/%s", indexDCAEJars.toString(), name); + return new URL(url); + } catch (MalformedURLException e) { + // Hopefully you never come here... + return null; + } + }).collect(Collectors.toList()); + } catch (Exception e) { + throw new RuntimeException("Error while getting jar URIs", e); + } + } + + private static BundleDetails createBundleDetails(URLClassLoader classLoader) { + try { + URL url = classLoader.findResource("META-INF/MANIFEST.MF"); + Manifest manifest = new Manifest(url.openStream()); + + final Attributes attributes = manifest.getMainAttributes(); + + final BundleDetails.Builder builder = new BundleDetails.Builder(); + // NOTE: Working directory cannot be null so set it to some bogus dir + // because we aren't really using this. Or maybe should create our own + // working directory + builder.workingDir(new File("/tmp")); + + final String group = attributes.getValue("Group"); + final String id = attributes.getValue("Id"); + final String version = attributes.getValue("Version"); + builder.coordinate(new BundleCoordinate(group, id, version)); + + return builder.build(); + } catch (IOException e) { + throw new DCAEClassLoadersError(e); + } + } + + /** + * From a list of URLs to remote JARs where the JARs contain DCAEProcessor classes, + * create a bundle for each JAR. You will never get a partial list of bundles. + * + * @param jarURLs + * @return + */ + public static Set createDCAEBundles(List jarURLs) { + Set bundles = new HashSet<>(); + + for (URL jarURL : jarURLs) { + URLClassLoader classLoader = new URLClassLoader(new URL[] {jarURL}); + Bundle bundle = new Bundle(createBundleDetails(classLoader), classLoader); + bundles.add(bundle); + } + + return bundles; + } + +} diff --git a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/util/NiFiProperties.java b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/util/NiFiProperties.java new file mode 100644 index 0000000..3b341ec --- /dev/null +++ b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -0,0 +1,1551 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modifications to the original nifi code for the ONAP project are made + * available under the Apache License, Version 2.0 + */ +package org.apache.nifi.util; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.InvalidPathException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * The NiFiProperties class holds all properties which are needed for various + * values to be available at runtime. It is strongly tied to the startup + * properties needed and is often refer to as the 'nifi.properties' file. The + * properties contains keys and values. Great care should be taken in leveraging + * this class or passing it along. Its use should be refactored and minimized + * over time. + */ +public abstract class NiFiProperties { + + // core properties + public static final String PROPERTIES_FILE_PATH = "nifi.properties.file.path"; + public static final String FLOW_CONFIGURATION_FILE = "nifi.flow.configuration.file"; + public static final String FLOW_CONFIGURATION_ARCHIVE_ENABLED = "nifi.flow.configuration.archive.enabled"; + public static final String FLOW_CONFIGURATION_ARCHIVE_DIR = "nifi.flow.configuration.archive.dir"; + public static final String FLOW_CONFIGURATION_ARCHIVE_MAX_TIME = "nifi.flow.configuration.archive.max.time"; + public static final String FLOW_CONFIGURATION_ARCHIVE_MAX_STORAGE = "nifi.flow.configuration.archive.max.storage"; + public static final String FLOW_CONFIGURATION_ARCHIVE_MAX_COUNT = "nifi.flow.configuration.archive.max.count"; + public static final String AUTHORIZER_CONFIGURATION_FILE = "nifi.authorizer.configuration.file"; + public static final String LOGIN_IDENTITY_PROVIDER_CONFIGURATION_FILE = "nifi.login.identity.provider.configuration.file"; + public static final String REPOSITORY_DATABASE_DIRECTORY = "nifi.database.directory"; + public static final String RESTORE_DIRECTORY = "nifi.restore.directory"; + public static final String WRITE_DELAY_INTERVAL = "nifi.flowservice.writedelay.interval"; + public static final String AUTO_RESUME_STATE = "nifi.flowcontroller.autoResumeState"; + public static final String FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD = "nifi.flowcontroller.graceful.shutdown.period"; + public static final String NAR_LIBRARY_DIRECTORY = "nifi.nar.library.directory"; + public static final String NAR_LIBRARY_DIRECTORY_PREFIX = "nifi.nar.library.directory."; + public static final String NAR_LIBRARY_AUTOLOAD_DIRECTORY = "nifi.nar.library.autoload.directory"; + public static final String NAR_WORKING_DIRECTORY = "nifi.nar.working.directory"; + public static final String COMPONENT_DOCS_DIRECTORY = "nifi.documentation.working.directory"; + public static final String SENSITIVE_PROPS_KEY = "nifi.sensitive.props.key"; + public static final String SENSITIVE_PROPS_ALGORITHM = "nifi.sensitive.props.algorithm"; + public static final String SENSITIVE_PROPS_PROVIDER = "nifi.sensitive.props.provider"; + public static final String H2_URL_APPEND = "nifi.h2.url.append"; + public static final String REMOTE_INPUT_HOST = "nifi.remote.input.host"; + public static final String REMOTE_INPUT_PORT = "nifi.remote.input.socket.port"; + public static final String SITE_TO_SITE_SECURE = "nifi.remote.input.secure"; + public static final String SITE_TO_SITE_HTTP_ENABLED = "nifi.remote.input.http.enabled"; + public static final String SITE_TO_SITE_HTTP_TRANSACTION_TTL = "nifi.remote.input.http.transaction.ttl"; + public static final String REMOTE_CONTENTS_CACHE_EXPIRATION = "nifi.remote.contents.cache.expiration"; + public static final String TEMPLATE_DIRECTORY = "nifi.templates.directory"; + public static final String ADMINISTRATIVE_YIELD_DURATION = "nifi.administrative.yield.duration"; + public static final String PERSISTENT_STATE_DIRECTORY = "nifi.persistent.state.directory"; + public static final String BORED_YIELD_DURATION = "nifi.bored.yield.duration"; + public static final String PROCESSOR_SCHEDULING_TIMEOUT = "nifi.processor.scheduling.timeout"; + public static final String BACKPRESSURE_COUNT = "nifi.queue.backpressure.count"; + public static final String BACKPRESSURE_SIZE = "nifi.queue.backpressure.size"; + + // DCAE related config + public static final String DCAE_JARS_INDEX_URL = "nifi.dcae.jars.index.url"; + + // content repository properties + public static final String REPOSITORY_CONTENT_PREFIX = "nifi.content.repository.directory."; + public static final String CONTENT_REPOSITORY_IMPLEMENTATION = "nifi.content.repository.implementation"; + public static final String MAX_APPENDABLE_CLAIM_SIZE = "nifi.content.claim.max.appendable.size"; + public static final String MAX_FLOWFILES_PER_CLAIM = "nifi.content.claim.max.flow.files"; + public static final String CONTENT_ARCHIVE_MAX_RETENTION_PERIOD = "nifi.content.repository.archive.max.retention.period"; + public static final String CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE = "nifi.content.repository.archive.max.usage.percentage"; + public static final String CONTENT_ARCHIVE_BACK_PRESSURE_PERCENTAGE = "nifi.content.repository.archive.backpressure.percentage"; + public static final String CONTENT_ARCHIVE_ENABLED = "nifi.content.repository.archive.enabled"; + public static final String CONTENT_ARCHIVE_CLEANUP_FREQUENCY = "nifi.content.repository.archive.cleanup.frequency"; + public static final String CONTENT_VIEWER_URL = "nifi.content.viewer.url"; + + // flowfile repository properties + public static final String FLOWFILE_REPOSITORY_IMPLEMENTATION = "nifi.flowfile.repository.implementation"; + public static final String FLOWFILE_REPOSITORY_ALWAYS_SYNC = "nifi.flowfile.repository.always.sync"; + public static final String FLOWFILE_REPOSITORY_DIRECTORY = "nifi.flowfile.repository.directory"; + public static final String FLOWFILE_REPOSITORY_PARTITIONS = "nifi.flowfile.repository.partitions"; + public static final String FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL = "nifi.flowfile.repository.checkpoint.interval"; + public static final String FLOWFILE_SWAP_MANAGER_IMPLEMENTATION = "nifi.swap.manager.implementation"; + public static final String QUEUE_SWAP_THRESHOLD = "nifi.queue.swap.threshold"; + public static final String SWAP_IN_THREADS = "nifi.swap.in.threads"; + public static final String SWAP_IN_PERIOD = "nifi.swap.in.period"; + public static final String SWAP_OUT_THREADS = "nifi.swap.out.threads"; + public static final String SWAP_OUT_PERIOD = "nifi.swap.out.period"; + + // provenance properties + public static final String PROVENANCE_REPO_IMPLEMENTATION_CLASS = "nifi.provenance.repository.implementation"; + public static final String PROVENANCE_REPO_DIRECTORY_PREFIX = "nifi.provenance.repository.directory."; + public static final String PROVENANCE_MAX_STORAGE_TIME = "nifi.provenance.repository.max.storage.time"; + public static final String PROVENANCE_MAX_STORAGE_SIZE = "nifi.provenance.repository.max.storage.size"; + public static final String PROVENANCE_ROLLOVER_TIME = "nifi.provenance.repository.rollover.time"; + public static final String PROVENANCE_ROLLOVER_SIZE = "nifi.provenance.repository.rollover.size"; + public static final String PROVENANCE_QUERY_THREAD_POOL_SIZE = "nifi.provenance.repository.query.threads"; + public static final String PROVENANCE_INDEX_THREAD_POOL_SIZE = "nifi.provenance.repository.index.threads"; + public static final String PROVENANCE_COMPRESS_ON_ROLLOVER = "nifi.provenance.repository.compress.on.rollover"; + public static final String PROVENANCE_INDEXED_FIELDS = "nifi.provenance.repository.indexed.fields"; + public static final String PROVENANCE_INDEXED_ATTRIBUTES = "nifi.provenance.repository.indexed.attributes"; + public static final String PROVENANCE_INDEX_SHARD_SIZE = "nifi.provenance.repository.index.shard.size"; + public static final String PROVENANCE_JOURNAL_COUNT = "nifi.provenance.repository.journal.count"; + public static final String PROVENANCE_REPO_ENCRYPTION_KEY = "nifi.provenance.repository.encryption.key"; + public static final String PROVENANCE_REPO_ENCRYPTION_KEY_ID = "nifi.provenance.repository.encryption.key.id"; + public static final String PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS = "nifi.provenance.repository.encryption.key.provider.implementation"; + public static final String PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_LOCATION = "nifi.provenance.repository.encryption.key.provider.location"; + public static final String PROVENANCE_REPO_DEBUG_FREQUENCY = "nifi.provenance.repository.debug.frequency"; + + // component status repository properties + public static final String COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION = "nifi.components.status.repository.implementation"; + public static final String COMPONENT_STATUS_SNAPSHOT_FREQUENCY = "nifi.components.status.snapshot.frequency"; + + // security properties + public static final String SECURITY_KEYSTORE = "nifi.security.keystore"; + public static final String SECURITY_KEYSTORE_TYPE = "nifi.security.keystoreType"; + public static final String SECURITY_KEYSTORE_PASSWD = "nifi.security.keystorePasswd"; + public static final String SECURITY_KEY_PASSWD = "nifi.security.keyPasswd"; + public static final String SECURITY_TRUSTSTORE = "nifi.security.truststore"; + public static final String SECURITY_TRUSTSTORE_TYPE = "nifi.security.truststoreType"; + public static final String SECURITY_TRUSTSTORE_PASSWD = "nifi.security.truststorePasswd"; + public static final String SECURITY_USER_AUTHORIZER = "nifi.security.user.authorizer"; + public static final String SECURITY_USER_LOGIN_IDENTITY_PROVIDER = "nifi.security.user.login.identity.provider"; + public static final String SECURITY_OCSP_RESPONDER_URL = "nifi.security.ocsp.responder.url"; + public static final String SECURITY_OCSP_RESPONDER_CERTIFICATE = "nifi.security.ocsp.responder.certificate"; + public static final String SECURITY_IDENTITY_MAPPING_PATTERN_PREFIX = "nifi.security.identity.mapping.pattern."; + public static final String SECURITY_IDENTITY_MAPPING_VALUE_PREFIX = "nifi.security.identity.mapping.value."; + public static final String SECURITY_IDENTITY_MAPPING_TRANSFORM_PREFIX = "nifi.security.identity.mapping.transform."; + public static final String SECURITY_GROUP_MAPPING_PATTERN_PREFIX = "nifi.security.group.mapping.pattern."; + public static final String SECURITY_GROUP_MAPPING_VALUE_PREFIX = "nifi.security.group.mapping.value."; + public static final String SECURITY_GROUP_MAPPING_TRANSFORM_PREFIX = "nifi.security.group.mapping.transform."; + + // oidc + public static final String SECURITY_USER_OIDC_DISCOVERY_URL = "nifi.security.user.oidc.discovery.url"; + public static final String SECURITY_USER_OIDC_CONNECT_TIMEOUT = "nifi.security.user.oidc.connect.timeout"; + public static final String SECURITY_USER_OIDC_READ_TIMEOUT = "nifi.security.user.oidc.read.timeout"; + public static final String SECURITY_USER_OIDC_CLIENT_ID = "nifi.security.user.oidc.client.id"; + public static final String SECURITY_USER_OIDC_CLIENT_SECRET = "nifi.security.user.oidc.client.secret"; + public static final String SECURITY_USER_OIDC_PREFERRED_JWSALGORITHM = "nifi.security.user.oidc.preferred.jwsalgorithm"; + + // apache knox + public static final String SECURITY_USER_KNOX_URL = "nifi.security.user.knox.url"; + public static final String SECURITY_USER_KNOX_PUBLIC_KEY = "nifi.security.user.knox.publicKey"; + public static final String SECURITY_USER_KNOX_COOKIE_NAME = "nifi.security.user.knox.cookieName"; + public static final String SECURITY_USER_KNOX_AUDIENCES = "nifi.security.user.knox.audiences"; + + // web properties + public static final String WEB_WAR_DIR = "nifi.web.war.directory"; + public static final String WEB_HTTP_PORT = "nifi.web.http.port"; + public static final String WEB_HTTP_PORT_FORWARDING = "nifi.web.http.port.forwarding"; + public static final String WEB_HTTP_HOST = "nifi.web.http.host"; + public static final String WEB_HTTP_NETWORK_INTERFACE_PREFIX = "nifi.web.http.network.interface."; + public static final String WEB_HTTPS_PORT = "nifi.web.https.port"; + public static final String WEB_HTTPS_PORT_FORWARDING = "nifi.web.https.port.forwarding"; + public static final String WEB_HTTPS_HOST = "nifi.web.https.host"; + public static final String WEB_HTTPS_NETWORK_INTERFACE_PREFIX = "nifi.web.https.network.interface."; + public static final String WEB_WORKING_DIR = "nifi.web.jetty.working.directory"; + public static final String WEB_THREADS = "nifi.web.jetty.threads"; + public static final String WEB_MAX_HEADER_SIZE = "nifi.web.max.header.size"; + public static final String WEB_PROXY_CONTEXT_PATH = "nifi.web.proxy.context.path"; + public static final String WEB_PROXY_HOST = "nifi.web.proxy.host"; + + // ui properties + public static final String UI_BANNER_TEXT = "nifi.ui.banner.text"; + public static final String UI_AUTO_REFRESH_INTERVAL = "nifi.ui.autorefresh.interval"; + public static final String UI_DCAE_DISTRIBUTOR_API_URL="nifi.ui.dcae.distibutor.api.url"; + + // cluster common properties + public static final String CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "nifi.cluster.protocol.heartbeat.interval"; + public static final String CLUSTER_PROTOCOL_IS_SECURE = "nifi.cluster.protocol.is.secure"; + + // cluster node properties + public static final String CLUSTER_IS_NODE = "nifi.cluster.is.node"; + public static final String CLUSTER_NODE_ADDRESS = "nifi.cluster.node.address"; + public static final String CLUSTER_NODE_PROTOCOL_PORT = "nifi.cluster.node.protocol.port"; + public static final String CLUSTER_NODE_PROTOCOL_THREADS = "nifi.cluster.node.protocol.threads"; + public static final String CLUSTER_NODE_PROTOCOL_MAX_THREADS = "nifi.cluster.node.protocol.max.threads"; + public static final String CLUSTER_NODE_CONNECTION_TIMEOUT = "nifi.cluster.node.connection.timeout"; + public static final String CLUSTER_NODE_READ_TIMEOUT = "nifi.cluster.node.read.timeout"; + public static final String CLUSTER_NODE_MAX_CONCURRENT_REQUESTS = "nifi.cluster.node.max.concurrent.requests"; + public static final String CLUSTER_FIREWALL_FILE = "nifi.cluster.firewall.file"; + public static final String FLOW_ELECTION_MAX_WAIT_TIME = "nifi.cluster.flow.election.max.wait.time"; + public static final String FLOW_ELECTION_MAX_CANDIDATES = "nifi.cluster.flow.election.max.candidates"; + + // cluster load balance properties + public static final String LOAD_BALANCE_ADDRESS = "nifi.cluster.load.balance.address"; + public static final String LOAD_BALANCE_PORT = "nifi.cluster.load.balance.port"; + public static final String LOAD_BALANCE_CONNECTIONS_PER_NODE = "nifi.cluster.load.balance.connections.per.node"; + public static final String LOAD_BALANCE_MAX_THREAD_COUNT = "nifi.cluster.load.balance.max.thread.count"; + public static final String LOAD_BALANCE_COMMS_TIMEOUT = "nifi.cluster.load.balance.comms.timeout"; + + // zookeeper properties + public static final String ZOOKEEPER_CONNECT_STRING = "nifi.zookeeper.connect.string"; + public static final String ZOOKEEPER_CONNECT_TIMEOUT = "nifi.zookeeper.connect.timeout"; + public static final String ZOOKEEPER_SESSION_TIMEOUT = "nifi.zookeeper.session.timeout"; + public static final String ZOOKEEPER_ROOT_NODE = "nifi.zookeeper.root.node"; + public static final String ZOOKEEPER_AUTH_TYPE = "nifi.zookeeper.auth.type"; + public static final String ZOOKEEPER_KERBEROS_REMOVE_HOST_FROM_PRINCIPAL = "nifi.zookeeper.kerberos.removeHostFromPrincipal"; + public static final String ZOOKEEPER_KERBEROS_REMOVE_REALM_FROM_PRINCIPAL = "nifi.zookeeper.kerberos.removeRealmFromPrincipal"; + + // kerberos properties + public static final String KERBEROS_KRB5_FILE = "nifi.kerberos.krb5.file"; + public static final String KERBEROS_SERVICE_PRINCIPAL = "nifi.kerberos.service.principal"; + public static final String KERBEROS_SERVICE_KEYTAB_LOCATION = "nifi.kerberos.service.keytab.location"; + public static final String KERBEROS_SPNEGO_PRINCIPAL = "nifi.kerberos.spnego.principal"; + public static final String KERBEROS_SPNEGO_KEYTAB_LOCATION = "nifi.kerberos.spnego.keytab.location"; + public static final String KERBEROS_AUTHENTICATION_EXPIRATION = "nifi.kerberos.spnego.authentication.expiration"; + + // state management + public static final String STATE_MANAGEMENT_CONFIG_FILE = "nifi.state.management.configuration.file"; + public static final String STATE_MANAGEMENT_LOCAL_PROVIDER_ID = "nifi.state.management.provider.local"; + public static final String STATE_MANAGEMENT_CLUSTER_PROVIDER_ID = "nifi.state.management.provider.cluster"; + public static final String STATE_MANAGEMENT_START_EMBEDDED_ZOOKEEPER = "nifi.state.management.embedded.zookeeper.start"; + public static final String STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES = "nifi.state.management.embedded.zookeeper.properties"; + + // expression language properties + public static final String VARIABLE_REGISTRY_PROPERTIES = "nifi.variable.registry.properties"; + + // defaults + public static final Boolean DEFAULT_AUTO_RESUME_STATE = true; + public static final String DEFAULT_AUTHORIZER_CONFIGURATION_FILE = "conf/authorizers.xml"; + public static final String DEFAULT_LOGIN_IDENTITY_PROVIDER_CONFIGURATION_FILE = "conf/login-identity-providers.xml"; + public static final Integer DEFAULT_REMOTE_INPUT_PORT = null; + public static final Path DEFAULT_TEMPLATE_DIRECTORY = Paths.get("conf", "templates"); + public static final int DEFAULT_WEB_THREADS = 200; + public static final String DEFAULT_WEB_MAX_HEADER_SIZE = "16 KB"; + public static final String DEFAULT_WEB_WORKING_DIR = "./work/jetty"; + public static final String DEFAULT_NAR_WORKING_DIR = "./work/nar"; + public static final String DEFAULT_COMPONENT_DOCS_DIRECTORY = "./work/docs/components"; + public static final String DEFAULT_NAR_LIBRARY_DIR = "./lib"; + public static final String DEFAULT_NAR_LIBRARY_AUTOLOAD_DIR = "./extensions"; + public static final String DEFAULT_FLOWFILE_REPO_PARTITIONS = "256"; + public static final String DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL = "2 min"; + public static final int DEFAULT_MAX_FLOWFILES_PER_CLAIM = 100; + public static final String DEFAULT_MAX_APPENDABLE_CLAIM_SIZE = "1 MB"; + public static final int DEFAULT_QUEUE_SWAP_THRESHOLD = 20000; + public static final String DEFAULT_SWAP_STORAGE_LOCATION = "./flowfile_repository/swap"; + public static final String DEFAULT_SWAP_IN_PERIOD = "1 sec"; + public static final String DEFAULT_SWAP_OUT_PERIOD = "5 sec"; + public static final int DEFAULT_SWAP_IN_THREADS = 4; + public static final int DEFAULT_SWAP_OUT_THREADS = 4; + public static final long DEFAULT_BACKPRESSURE_COUNT = 10_000L; + public static final String DEFAULT_BACKPRESSURE_SIZE = "1 GB"; + public static final String DEFAULT_ADMINISTRATIVE_YIELD_DURATION = "30 sec"; + public static final String DEFAULT_PERSISTENT_STATE_DIRECTORY = "./conf/state"; + public static final String DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY = "5 mins"; + public static final String DEFAULT_BORED_YIELD_DURATION = "10 millis"; + public static final String DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT = "3 secs"; + public static final String DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = "3 secs"; + public static final String DEFAULT_ZOOKEEPER_ROOT_NODE = "/nifi"; + public static final String DEFAULT_ZOOKEEPER_AUTH_TYPE = "default"; + public static final String DEFAULT_ZOOKEEPER_KERBEROS_REMOVE_HOST_FROM_PRINCIPAL = "true"; + public static final String DEFAULT_ZOOKEEPER_KERBEROS_REMOVE_REALM_FROM_PRINCIPAL = "true"; + public static final String DEFAULT_SITE_TO_SITE_HTTP_TRANSACTION_TTL = "30 secs"; + public static final String DEFAULT_FLOW_CONFIGURATION_ARCHIVE_ENABLED = "true"; + public static final String DEFAULT_FLOW_CONFIGURATION_ARCHIVE_MAX_TIME = "30 days"; + public static final String DEFAULT_FLOW_CONFIGURATION_ARCHIVE_MAX_STORAGE = "500 MB"; + public static final String DEFAULT_SECURITY_USER_OIDC_CONNECT_TIMEOUT = "5 secs"; + public static final String DEFAULT_SECURITY_USER_OIDC_READ_TIMEOUT = "5 secs"; + + // DCAE related config + // REVIEW: Default is to turn off the dcae jar loading until the platform becomes more accessible/stable + public static final String DEFAULT_DCAE_JARS_INDEX_URL = ""; + + // cluster common defaults + public static final String DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "5 sec"; + public static final String DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_BROADCAST_DELAY = "500 ms"; + public static final int DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS = 3; + public static final String DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY = "1 sec"; + public static final String DEFAULT_CLUSTER_NODE_READ_TIMEOUT = "5 sec"; + public static final String DEFAULT_CLUSTER_NODE_CONNECTION_TIMEOUT = "5 sec"; + public static final int DEFAULT_CLUSTER_NODE_MAX_CONCURRENT_REQUESTS = 100; + + // cluster node defaults + public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 10; + public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_MAX_THREADS = 50; + public static final String DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT = "15 secs"; + public static final String DEFAULT_FLOW_ELECTION_MAX_WAIT_TIME = "5 mins"; + + // cluster load balance defaults + public static final int DEFAULT_LOAD_BALANCE_PORT = 6342; + public static final int DEFAULT_LOAD_BALANCE_CONNECTIONS_PER_NODE = 4; + public static final int DEFAULT_LOAD_BALANCE_MAX_THREAD_COUNT = 8; + public static final String DEFAULT_LOAD_BALANCE_COMMS_TIMEOUT = "30 sec"; + + + // state management defaults + public static final String DEFAULT_STATE_MANAGEMENT_CONFIG_FILE = "conf/state-management.xml"; + + // Kerberos defaults + public static final String DEFAULT_KERBEROS_AUTHENTICATION_EXPIRATION = "12 hours"; + + + /** + * Retrieves the property value for the given property key. + * + * @param key the key of property value to lookup + * @return value of property at given key or null if not found + */ + public abstract String getProperty(String key); + + /** + * Retrieves all known property keys. + * + * @return all known property keys + */ + public abstract Set getPropertyKeys(); + + // getters for core properties // + public File getFlowConfigurationFile() { + try { + return new File(getProperty(FLOW_CONFIGURATION_FILE)); + } catch (Exception ex) { + return null; + } + } + + public File getFlowConfigurationFileDir() { + try { + return getFlowConfigurationFile().getParentFile(); + } catch (Exception ex) { + return null; + } + } + + private Integer getPropertyAsPort(final String propertyName, final Integer defaultValue) { + final String port = getProperty(propertyName); + if (StringUtils.isEmpty(port)) { + return defaultValue; + } + try { + final int val = Integer.parseInt(port); + if (val <= 0 || val > 65535) { + throw new RuntimeException("Valid port range is 0 - 65535 but got " + val); + } + return val; + } catch (final NumberFormatException e) { + return defaultValue; + } + } + + public int getQueueSwapThreshold() { + final String thresholdValue = getProperty(QUEUE_SWAP_THRESHOLD); + if (thresholdValue == null) { + return DEFAULT_QUEUE_SWAP_THRESHOLD; + } + + try { + return Integer.parseInt(thresholdValue); + } catch (final NumberFormatException e) { + return DEFAULT_QUEUE_SWAP_THRESHOLD; + } + } + + public Integer getIntegerProperty(final String propertyName, final Integer defaultValue) { + final String value = getProperty(propertyName); + if (value == null || value.trim().isEmpty()) { + return defaultValue; + } + + try { + return Integer.parseInt(value.trim()); + } catch (final Exception e) { + return defaultValue; + } + } + + public int getSwapInThreads() { + return getIntegerProperty(SWAP_IN_THREADS, DEFAULT_SWAP_IN_THREADS); + } + + public int getSwapOutThreads() { + final String value = getProperty(SWAP_OUT_THREADS); + if (value == null) { + return DEFAULT_SWAP_OUT_THREADS; + } + + try { + return Integer.parseInt(getProperty(SWAP_OUT_THREADS)); + } catch (final Exception e) { + return DEFAULT_SWAP_OUT_THREADS; + } + } + + public String getSwapInPeriod() { + return getProperty(SWAP_IN_PERIOD, DEFAULT_SWAP_IN_PERIOD); + } + + public String getSwapOutPeriod() { + return getProperty(SWAP_OUT_PERIOD, DEFAULT_SWAP_OUT_PERIOD); + } + + public String getAdministrativeYieldDuration() { + return getProperty(ADMINISTRATIVE_YIELD_DURATION, DEFAULT_ADMINISTRATIVE_YIELD_DURATION); + } + + /** + * The host name that will be given out to clients to connect to the Remote + * Input Port. + * + * @return the remote input host name or null if not configured + */ + public String getRemoteInputHost() { + final String value = getProperty(REMOTE_INPUT_HOST); + return StringUtils.isBlank(value) ? null : value; + } + + /** + * The socket port to listen on for a Remote Input Port. + * + * @return the remote input port for RAW socket communication + */ + public Integer getRemoteInputPort() { + return getPropertyAsPort(REMOTE_INPUT_PORT, DEFAULT_REMOTE_INPUT_PORT); + } + + /** + * @return False if property value is 'false'; True otherwise. + */ + public Boolean isSiteToSiteSecure() { + final String secureVal = getProperty(SITE_TO_SITE_SECURE, "true"); + + return !"false".equalsIgnoreCase(secureVal); + + } + + /** + * @return True if property value is 'true'; False otherwise. + */ + public Boolean isSiteToSiteHttpEnabled() { + final String remoteInputHttpEnabled = getProperty(SITE_TO_SITE_HTTP_ENABLED, "false"); + + return "true".equalsIgnoreCase(remoteInputHttpEnabled); + + } + + /** + * The HTTP or HTTPS Web API port for a Remote Input Port. + * + * @return the remote input port for HTTP(S) communication, or null if + * HTTP(S) Site-to-Site is not enabled + */ + public Integer getRemoteInputHttpPort() { + if (!isSiteToSiteHttpEnabled()) { + return null; + } + + final String propertyKey; + if (isSiteToSiteSecure()) { + if (StringUtils.isBlank(getProperty(NiFiProperties.WEB_HTTPS_PORT_FORWARDING))) { + propertyKey = WEB_HTTPS_PORT; + } else { + propertyKey = WEB_HTTPS_PORT_FORWARDING; + } + } else { + if (StringUtils.isBlank(getProperty(NiFiProperties.WEB_HTTP_PORT_FORWARDING))) { + propertyKey = WEB_HTTP_PORT; + } else { + propertyKey = WEB_HTTP_PORT_FORWARDING; + } + } + + final Integer port = getIntegerProperty(propertyKey, null); + if (port == null) { + throw new RuntimeException("Remote input HTTP" + (isSiteToSiteSecure() ? "S" : "") + + " is enabled but " + propertyKey + " is not specified."); + } + return port; + } + + /** + * Returns the directory to which Templates are to be persisted + * + * @return the template directory + */ + public Path getTemplateDirectory() { + final String strVal = getProperty(TEMPLATE_DIRECTORY); + return (strVal == null) ? DEFAULT_TEMPLATE_DIRECTORY : Paths.get(strVal); + } + + /** + * Get the flow service write delay. + * + * @return The write delay + */ + public String getFlowServiceWriteDelay() { + return getProperty(WRITE_DELAY_INTERVAL); + } + + /** + * Returns whether the processors should be started automatically when the + * application loads. + * + * @return Whether to auto start the processors or not + */ + public boolean getAutoResumeState() { + final String rawAutoResumeState = getProperty(AUTO_RESUME_STATE, + DEFAULT_AUTO_RESUME_STATE.toString()); + return Boolean.parseBoolean(rawAutoResumeState); + } + + /** + * Returns the number of partitions that should be used for the FlowFile + * Repository + * + * @return the number of partitions + */ + public int getFlowFileRepositoryPartitions() { + final String rawProperty = getProperty(FLOWFILE_REPOSITORY_PARTITIONS, + DEFAULT_FLOWFILE_REPO_PARTITIONS); + return Integer.parseInt(rawProperty); + } + + /** + * Returns the number of milliseconds between FlowFileRepository + * checkpointing + * + * @return the number of milliseconds between checkpoint events + */ + public String getFlowFileRepositoryCheckpointInterval() { + return getProperty(FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL, + DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL); + } + + /** + * @return the restore directory or null if not configured + */ + public File getRestoreDirectory() { + final String value = getProperty(RESTORE_DIRECTORY); + if (StringUtils.isBlank(value)) { + return null; + } else { + return new File(value); + } + } + + /** + * @return the user authorizers file + */ + public File getAuthorizerConfigurationFile() { + final String value = getProperty(AUTHORIZER_CONFIGURATION_FILE); + if (StringUtils.isBlank(value)) { + return new File(DEFAULT_AUTHORIZER_CONFIGURATION_FILE); + } else { + return new File(value); + } + } + + /** + * @return the user login identity provider file + */ + public File getLoginIdentityProviderConfigurationFile() { + final String value = getProperty(LOGIN_IDENTITY_PROVIDER_CONFIGURATION_FILE); + if (StringUtils.isBlank(value)) { + return new File(DEFAULT_LOGIN_IDENTITY_PROVIDER_CONFIGURATION_FILE); + } else { + return new File(value); + } + } + + // getters for web properties // + public Integer getPort() { + Integer port = null; + try { + port = Integer.parseInt(getProperty(WEB_HTTP_PORT)); + } catch (NumberFormatException nfe) { + } + return port; + } + + public Integer getSslPort() { + Integer sslPort = null; + try { + sslPort = Integer.parseInt(getProperty(WEB_HTTPS_PORT)); + } catch (NumberFormatException nfe) { + } + return sslPort; + } + + public boolean isHTTPSConfigured() { + return getSslPort() != null; + } + + /** + * Determines the HTTP/HTTPS port NiFi is configured to bind to. Prefers the HTTPS port. Throws an exception if neither is configured. + * + * @return the configured port number + */ + public Integer getConfiguredHttpOrHttpsPort() throws RuntimeException { + if (getSslPort() != null) { + return getSslPort(); + } else if (getPort() != null) { + return getPort(); + } else { + throw new RuntimeException("The HTTP or HTTPS port must be configured"); + } + } + + public String getWebMaxHeaderSize() { + return getProperty(WEB_MAX_HEADER_SIZE, DEFAULT_WEB_MAX_HEADER_SIZE); + } + + public int getWebThreads() { + return getIntegerProperty(WEB_THREADS, DEFAULT_WEB_THREADS); + } + + public int getClusterNodeMaxConcurrentRequests() { + return getIntegerProperty(CLUSTER_NODE_MAX_CONCURRENT_REQUESTS, DEFAULT_CLUSTER_NODE_MAX_CONCURRENT_REQUESTS); + } + + public File getWebWorkingDirectory() { + return new File(getProperty(WEB_WORKING_DIR, DEFAULT_WEB_WORKING_DIR)); + } + + public File getComponentDocumentationWorkingDirectory() { + return new File(getProperty(COMPONENT_DOCS_DIRECTORY, DEFAULT_COMPONENT_DOCS_DIRECTORY)); + } + + public File getNarWorkingDirectory() { + return new File(getProperty(NAR_WORKING_DIRECTORY, DEFAULT_NAR_WORKING_DIR)); + } + + public File getFrameworkWorkingDirectory() { + return new File(getNarWorkingDirectory(), "framework"); + } + + public File getExtensionsWorkingDirectory() { + return new File(getNarWorkingDirectory(), "extensions"); + } + + public List getNarLibraryDirectories() { + + List narLibraryPaths = new ArrayList<>(); + + // go through each property + for (String propertyName : getPropertyKeys()) { + // determine if the property is a nar library path + if (StringUtils.startsWith(propertyName, NAR_LIBRARY_DIRECTORY_PREFIX) + || NAR_LIBRARY_DIRECTORY.equals(propertyName) + || NAR_LIBRARY_AUTOLOAD_DIRECTORY.equals(propertyName)) { + // attempt to resolve the path specified + String narLib = getProperty(propertyName); + if (!StringUtils.isBlank(narLib)) { + narLibraryPaths.add(Paths.get(narLib)); + } + } + } + + if (narLibraryPaths.isEmpty()) { + narLibraryPaths.add(Paths.get(DEFAULT_NAR_LIBRARY_DIR)); + } + + return narLibraryPaths; + } + + public File getNarAutoLoadDirectory() { + return new File(getProperty(NAR_LIBRARY_AUTOLOAD_DIRECTORY, DEFAULT_NAR_LIBRARY_AUTOLOAD_DIR)); + } + + /** + * Retrieves a URI to the index that contains URLs to all the DCAE jars to be loaded into Nifi. + * Refer to the genprocessor project for more info. + * + * Not setting the underlying configuration parameter "nifi.dcae.jar.index.url" is not + * fatal. Nifi will just skip over trying to load DCAE jars. + * + * @return + * @throws URISyntaxException + */ + public URI getDCAEJarIndexURI() throws URISyntaxException { + String strUrl = getProperty(DCAE_JARS_INDEX_URL, DEFAULT_DCAE_JARS_INDEX_URL); + + if (strUrl == null || strUrl.isEmpty()) { + return null; + } else { + return new URI(strUrl); + } + } + + // getters for ui properties // + + /** + * Get the banner text. + * + * @return The banner text + */ + public String getBannerText() { + return this.getProperty(UI_BANNER_TEXT, StringUtils.EMPTY); + } + + + /** + * @author Renu + * @return the IP address where the nifi-app is being hosted + */ + public String getDcaeDistributorApiHostname() { + return getProperty(UI_DCAE_DISTRIBUTOR_API_URL); + } + + /** + * Returns the auto refresh interval in seconds. + * + * @return the interval over which the properties should auto refresh + */ + public String getAutoRefreshInterval() { + return getProperty(UI_AUTO_REFRESH_INTERVAL); + } + + // getters for cluster protocol properties // + public String getClusterProtocolHeartbeatInterval() { + return getProperty(CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, + DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL); + } + + public String getNodeHeartbeatInterval() { + return getClusterProtocolHeartbeatInterval(); + } + + public String getClusterNodeReadTimeout() { + return getProperty(CLUSTER_NODE_READ_TIMEOUT, DEFAULT_CLUSTER_NODE_READ_TIMEOUT); + } + + public String getClusterNodeConnectionTimeout() { + return getProperty(CLUSTER_NODE_CONNECTION_TIMEOUT, + DEFAULT_CLUSTER_NODE_CONNECTION_TIMEOUT); + } + + public File getPersistentStateDirectory() { + final String dirName = getProperty(PERSISTENT_STATE_DIRECTORY, + DEFAULT_PERSISTENT_STATE_DIRECTORY); + final File file = new File(dirName); + if (!file.exists()) { + file.mkdirs(); + } + return file; + } + + // getters for cluster node properties // + public boolean isNode() { + return Boolean.parseBoolean(getProperty(CLUSTER_IS_NODE)); + } + + public InetSocketAddress getClusterNodeProtocolAddress() { + try { + String socketAddress = getProperty(CLUSTER_NODE_ADDRESS); + if (StringUtils.isBlank(socketAddress)) { + socketAddress = "localhost"; + } + int socketPort = getClusterNodeProtocolPort(); + return InetSocketAddress.createUnresolved(socketAddress, socketPort); + } catch (Exception ex) { + throw new RuntimeException("Invalid node protocol address/port due to: " + ex, ex); + } + } + + public InetSocketAddress getClusterLoadBalanceAddress() { + try { + String address = getProperty(LOAD_BALANCE_ADDRESS); + if (StringUtils.isBlank(address)) { + address = getProperty(CLUSTER_NODE_ADDRESS); + } + if (StringUtils.isBlank(address)) { + address = "localhost"; + } + + final int port = getIntegerProperty(LOAD_BALANCE_PORT, DEFAULT_LOAD_BALANCE_PORT); + return InetSocketAddress.createUnresolved(address, port); + } catch (final Exception e) { + throw new RuntimeException("Invalid load balance address/port due to: " + e, e); + } + } + + public Integer getClusterNodeProtocolPort() { + try { + return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_PORT)); + } catch (NumberFormatException nfe) { + return null; + } + } + + /** + * @deprecated Use getClusterNodeProtocolCorePoolSize() and getClusterNodeProtocolMaxPoolSize() instead + */ + @Deprecated() + public int getClusterNodeProtocolThreads() { + return getClusterNodeProtocolCorePoolSize(); + } + + public int getClusterNodeProtocolCorePoolSize() { + try { + return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_THREADS)); + } catch (NumberFormatException nfe) { + return DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS; + } + } + + public int getClusterNodeProtocolMaxPoolSize() { + try { + return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_MAX_THREADS)); + } catch (NumberFormatException nfe) { + return DEFAULT_CLUSTER_NODE_PROTOCOL_MAX_THREADS; + } + } + + public boolean isClustered() { + return Boolean.parseBoolean(getProperty(CLUSTER_IS_NODE)); + } + + public File getClusterNodeFirewallFile() { + final String firewallFile = getProperty(CLUSTER_FIREWALL_FILE); + if (StringUtils.isBlank(firewallFile)) { + return null; + } else { + return new File(firewallFile); + } + } + + public String getClusterProtocolManagerToNodeApiScheme() { + final String isSecureProperty = getProperty(CLUSTER_PROTOCOL_IS_SECURE); + if (Boolean.valueOf(isSecureProperty)) { + return "https"; + } else { + return "http"; + } + } + + public File getKerberosConfigurationFile() { + final String krb5File = getProperty(KERBEROS_KRB5_FILE); + if (krb5File != null && krb5File.trim().length() > 0) { + return new File(krb5File.trim()); + } else { + return null; + } + } + + public String getKerberosServicePrincipal() { + final String servicePrincipal = getProperty(KERBEROS_SERVICE_PRINCIPAL); + if (!StringUtils.isBlank(servicePrincipal)) { + return servicePrincipal.trim(); + } else { + return null; + } + } + + public String getKerberosServiceKeytabLocation() { + final String keytabLocation = getProperty(KERBEROS_SERVICE_KEYTAB_LOCATION); + if (!StringUtils.isBlank(keytabLocation)) { + return keytabLocation.trim(); + } else { + return null; + } + } + + public String getKerberosSpnegoPrincipal() { + final String spengoPrincipal = getProperty(KERBEROS_SPNEGO_PRINCIPAL); + if (!StringUtils.isBlank(spengoPrincipal)) { + return spengoPrincipal.trim(); + } else { + return null; + } + } + + public String getKerberosSpnegoKeytabLocation() { + final String keytabLocation = getProperty(KERBEROS_SPNEGO_KEYTAB_LOCATION); + if (!StringUtils.isBlank(keytabLocation)) { + return keytabLocation.trim(); + } else { + return null; + } + } + + public String getKerberosAuthenticationExpiration() { + final String authenticationExpirationString = getProperty(KERBEROS_AUTHENTICATION_EXPIRATION, DEFAULT_KERBEROS_AUTHENTICATION_EXPIRATION); + if (!StringUtils.isBlank(authenticationExpirationString)) { + return authenticationExpirationString.trim(); + } else { + return null; + } + } + + /** + * Returns true if the Kerberos service principal and keytab location + * properties are populated. + * + * @return true if Kerberos service support is enabled + */ + public boolean isKerberosSpnegoSupportEnabled() { + return !StringUtils.isBlank(getKerberosSpnegoPrincipal()) && !StringUtils.isBlank(getKerberosSpnegoKeytabLocation()); + } + + /** + * Returns true if the login identity provider has been configured. + * + * @return true if the login identity provider has been configured + */ + public boolean isLoginIdentityProviderEnabled() { + return !StringUtils.isBlank(getProperty(NiFiProperties.SECURITY_USER_LOGIN_IDENTITY_PROVIDER)); + } + + /** + * Returns whether an OpenId Connect (OIDC) URL is set. + * + * @return whether an OpenId Connection URL is set + */ + public boolean isOidcEnabled() { + return !StringUtils.isBlank(getOidcDiscoveryUrl()); + } + + /** + * Returns the OpenId Connect (OIDC) URL. Null otherwise. + * + * @return OIDC discovery url + */ + public String getOidcDiscoveryUrl() { + return getProperty(SECURITY_USER_OIDC_DISCOVERY_URL); + } + + /** + * Returns the OpenId Connect connect timeout. Non null. + * + * @return OIDC connect timeout + */ + public String getOidcConnectTimeout() { + return getProperty(SECURITY_USER_OIDC_CONNECT_TIMEOUT, DEFAULT_SECURITY_USER_OIDC_CONNECT_TIMEOUT); + } + + /** + * Returns the OpenId Connect read timeout. Non null. + * + * @return OIDC read timeout + */ + public String getOidcReadTimeout() { + return getProperty(SECURITY_USER_OIDC_READ_TIMEOUT, DEFAULT_SECURITY_USER_OIDC_READ_TIMEOUT); + } + + /** + * Returns the OpenId Connect client id. + * + * @return OIDC client id + */ + public String getOidcClientId() { + return getProperty(SECURITY_USER_OIDC_CLIENT_ID); + } + + /** + * Returns the OpenId Connect client secret. + * + * @return OIDC client secret + */ + public String getOidcClientSecret() { + return getProperty(SECURITY_USER_OIDC_CLIENT_SECRET); + } + + /** + * Returns the preferred json web signature algorithm. May be null/blank. + * + * @return OIDC preferred json web signature algorithm + */ + public String getOidcPreferredJwsAlgorithm() { + return getProperty(SECURITY_USER_OIDC_PREFERRED_JWSALGORITHM); + } + + /** + * Returns whether Knox SSO is enabled. + * + * @return whether Knox SSO is enabled + */ + public boolean isKnoxSsoEnabled() { + return !StringUtils.isBlank(getKnoxUrl()); + } + + /** + * Returns the Knox URL. + * + * @return Knox URL + */ + public String getKnoxUrl() { + return getProperty(SECURITY_USER_KNOX_URL); + } + + /** + * Gets the configured Knox Audiences. + * + * @return Knox audiences + */ + public Set getKnoxAudiences() { + final String rawAudiences = getProperty(SECURITY_USER_KNOX_AUDIENCES); + if (StringUtils.isBlank(rawAudiences)) { + return null; + } else { + final String[] audienceTokens = rawAudiences.split(","); + return Stream.of(audienceTokens).map(String::trim).filter(aud -> !StringUtils.isEmpty(aud)).collect(Collectors.toSet()); + } + } + + /** + * Returns the path to the Knox public key. + * + * @return path to the Knox public key + */ + public Path getKnoxPublicKeyPath() { + return Paths.get(getProperty(SECURITY_USER_KNOX_PUBLIC_KEY)); + } + + /** + * Returns the name of the Knox cookie. + * + * @return name of the Knox cookie + */ + public String getKnoxCookieName() { + return getProperty(SECURITY_USER_KNOX_COOKIE_NAME); + } + + /** + * Returns true if client certificates are required for REST API. Determined + * if the following conditions are all true: + *

+ * - login identity provider is not populated + * - Kerberos service support is not enabled + * - openid connect is not enabled + * - knox sso is not enabled + *

+ * + * @return true if client certificates are required for access to the REST API + */ + public boolean isClientAuthRequiredForRestApi() { + return !isLoginIdentityProviderEnabled() && !isKerberosSpnegoSupportEnabled() && !isOidcEnabled() && !isKnoxSsoEnabled(); + } + + public InetSocketAddress getNodeApiAddress() { + + final String rawScheme = getClusterProtocolManagerToNodeApiScheme(); + final String scheme = (rawScheme == null) ? "http" : rawScheme; + + final String host; + final Integer port; + if ("http".equalsIgnoreCase(scheme)) { + // get host + if (StringUtils.isBlank(getProperty(WEB_HTTP_HOST))) { + host = "localhost"; + } else { + host = getProperty(WEB_HTTP_HOST); + } + // get port + port = getPort(); + + if (port == null) { + throw new RuntimeException(String.format("The %s must be specified if running in a cluster with %s set to false.", WEB_HTTP_PORT, CLUSTER_PROTOCOL_IS_SECURE)); + } + } else { + // get host + if (StringUtils.isBlank(getProperty(WEB_HTTPS_HOST))) { + host = "localhost"; + } else { + host = getProperty(WEB_HTTPS_HOST); + } + // get port + port = getSslPort(); + + if (port == null) { + throw new RuntimeException(String.format("The %s must be specified if running in a cluster with %s set to true.", WEB_HTTPS_PORT, CLUSTER_PROTOCOL_IS_SECURE)); + } + } + + return InetSocketAddress.createUnresolved(host, port); + + } + + /** + * Returns the database repository path. It simply returns the value + * configured. No directories will be created as a result of this operation. + * + * @return database repository path + * @throws InvalidPathException If the configured path is invalid + */ + public Path getDatabaseRepositoryPath() { + return Paths.get(getProperty(REPOSITORY_DATABASE_DIRECTORY)); + } + + /** + * Returns the flow file repository path. It simply returns the value + * configured. No directories will be created as a result of this operation. + * + * @return database repository path + * @throws InvalidPathException If the configured path is invalid + */ + public Path getFlowFileRepositoryPath() { + return Paths.get(getProperty(FLOWFILE_REPOSITORY_DIRECTORY)); + } + + /** + * Returns the content repository paths. This method returns a mapping of + * file repository name to file repository paths. It simply returns the + * values configured. No directories will be created as a result of this + * operation. + * + * @return file repositories paths + * @throws InvalidPathException If any of the configured paths are invalid + */ + public Map getContentRepositoryPaths() { + final Map contentRepositoryPaths = new HashMap<>(); + + // go through each property + for (String propertyName : getPropertyKeys()) { + // determine if the property is a file repository path + if (StringUtils.startsWith(propertyName, REPOSITORY_CONTENT_PREFIX)) { + // get the repository key + final String key = StringUtils.substringAfter(propertyName, + REPOSITORY_CONTENT_PREFIX); + + // attempt to resolve the path specified + contentRepositoryPaths.put(key, Paths.get(getProperty(propertyName))); + } + } + return contentRepositoryPaths; + } + + /** + * Returns the provenance repository paths. This method returns a mapping of + * file repository name to file repository paths. It simply returns the + * values configured. No directories will be created as a result of this + * operation. + * + * @return the name and paths of all provenance repository locations + */ + public Map getProvenanceRepositoryPaths() { + final Map provenanceRepositoryPaths = new HashMap<>(); + + // go through each property + for (String propertyName : getPropertyKeys()) { + // determine if the property is a file repository path + if (StringUtils.startsWith(propertyName, PROVENANCE_REPO_DIRECTORY_PREFIX)) { + // get the repository key + final String key = StringUtils.substringAfter(propertyName, + PROVENANCE_REPO_DIRECTORY_PREFIX); + + // attempt to resolve the path specified + provenanceRepositoryPaths.put(key, Paths.get(getProperty(propertyName))); + } + } + return provenanceRepositoryPaths; + } + + /** + * Returns the number of claims to keep open for writing. Ideally, this will be at + * least as large as the number of threads that will be updating the repository simultaneously but we don't want + * to get too large because it will hold open up to this many FileOutputStreams. + *

+ * Default is {@link #DEFAULT_MAX_FLOWFILES_PER_CLAIM} + * + * @return the maximum number of flow files per claim + */ + public int getMaxFlowFilesPerClaim() { + try { + return Integer.parseInt(getProperty(MAX_FLOWFILES_PER_CLAIM)); + } catch (NumberFormatException nfe) { + return DEFAULT_MAX_FLOWFILES_PER_CLAIM; + } + } + + /** + * Returns the maximum size, in bytes, that claims should grow before writing a new file. This means that we won't continually write to one + * file that keeps growing but gives us a chance to bunch together many small files. + *

+ * Default is {@link #DEFAULT_MAX_APPENDABLE_CLAIM_SIZE} + * + * @return the maximum appendable claim size + */ + public String getMaxAppendableClaimSize() { + return getProperty(MAX_APPENDABLE_CLAIM_SIZE, DEFAULT_MAX_APPENDABLE_CLAIM_SIZE); + } + + public String getProperty(final String key, final String defaultValue) { + final String value = getProperty(key); + return (value == null || value.trim().isEmpty()) ? defaultValue : value; + } + + public String getBoredYieldDuration() { + return getProperty(BORED_YIELD_DURATION, DEFAULT_BORED_YIELD_DURATION); + } + + public File getStateManagementConfigFile() { + return new File(getProperty(STATE_MANAGEMENT_CONFIG_FILE, DEFAULT_STATE_MANAGEMENT_CONFIG_FILE)); + } + + public String getLocalStateProviderId() { + return getProperty(STATE_MANAGEMENT_LOCAL_PROVIDER_ID); + } + + public String getClusterStateProviderId() { + return getProperty(STATE_MANAGEMENT_CLUSTER_PROVIDER_ID); + } + + public File getEmbeddedZooKeeperPropertiesFile() { + final String filename = getProperty(STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES); + return filename == null ? null : new File(filename); + } + + public boolean isStartEmbeddedZooKeeper() { + return Boolean.parseBoolean(getProperty(STATE_MANAGEMENT_START_EMBEDDED_ZOOKEEPER)); + } + + public boolean isFlowConfigurationArchiveEnabled() { + return Boolean.parseBoolean(getProperty(FLOW_CONFIGURATION_ARCHIVE_ENABLED, DEFAULT_FLOW_CONFIGURATION_ARCHIVE_ENABLED)); + } + + public String getFlowConfigurationArchiveDir() { + return getProperty(FLOW_CONFIGURATION_ARCHIVE_DIR); + } + + public String getFlowElectionMaxWaitTime() { + return getProperty(FLOW_ELECTION_MAX_WAIT_TIME, DEFAULT_FLOW_ELECTION_MAX_WAIT_TIME); + } + + public Integer getFlowElectionMaxCandidates() { + return getIntegerProperty(FLOW_ELECTION_MAX_CANDIDATES, null); + } + + public String getFlowConfigurationArchiveMaxTime() { + return getProperty(FLOW_CONFIGURATION_ARCHIVE_MAX_TIME, null); + } + + public String getFlowConfigurationArchiveMaxStorage() { + return getProperty(FLOW_CONFIGURATION_ARCHIVE_MAX_STORAGE, null); + } + + public Integer getFlowConfigurationArchiveMaxCount() { + return getIntegerProperty(FLOW_CONFIGURATION_ARCHIVE_MAX_COUNT, null); + } + + public String getVariableRegistryProperties() { + return getProperty(VARIABLE_REGISTRY_PROPERTIES); + } + + public Path[] getVariableRegistryPropertiesPaths() { + final List vrPropertiesPaths = new ArrayList<>(); + + final String vrPropertiesFiles = getVariableRegistryProperties(); + if (!StringUtils.isEmpty(vrPropertiesFiles)) { + + final List vrPropertiesFileList = Arrays.asList(vrPropertiesFiles.split(",")); + + for (String propertiesFile : vrPropertiesFileList) { + vrPropertiesPaths.add(Paths.get(propertiesFile)); + } + + return vrPropertiesPaths.toArray(new Path[vrPropertiesPaths.size()]); + } else { + return new Path[]{}; + } + } + + /** + * Returns the network interface list to use for HTTP. This method returns a mapping of + * network interface property names to network interface names. + * + * @return the property name and network interface name of all HTTP network interfaces + */ + public Map getHttpNetworkInterfaces() { + final Map networkInterfaces = new HashMap<>(); + + // go through each property + for (String propertyName : getPropertyKeys()) { + // determine if the property is a network interface name + if (StringUtils.startsWith(propertyName, WEB_HTTP_NETWORK_INTERFACE_PREFIX)) { + // get the network interface property key + final String key = StringUtils.substringAfter(propertyName, + WEB_HTTP_NETWORK_INTERFACE_PREFIX); + networkInterfaces.put(key, getProperty(propertyName)); + } + } + return networkInterfaces; + } + + /** + * Returns the network interface list to use for HTTPS. This method returns a mapping of + * network interface property names to network interface names. + * + * @return the property name and network interface name of all HTTPS network interfaces + */ + public Map getHttpsNetworkInterfaces() { + final Map networkInterfaces = new HashMap<>(); + + // go through each property + for (String propertyName : getPropertyKeys()) { + // determine if the property is a network interface name + if (StringUtils.startsWith(propertyName, WEB_HTTPS_NETWORK_INTERFACE_PREFIX)) { + // get the network interface property key + final String key = StringUtils.substringAfter(propertyName, + WEB_HTTPS_NETWORK_INTERFACE_PREFIX); + networkInterfaces.put(key, getProperty(propertyName)); + } + } + return networkInterfaces; + } + + public int size() { + return getPropertyKeys().size(); + } + + public String getProvenanceRepoEncryptionKeyId() { + return getProperty(PROVENANCE_REPO_ENCRYPTION_KEY_ID); + } + + /** + * Returns the active provenance repository encryption key if a {@code StaticKeyProvider} is in use. + * If no key ID is specified in the properties file, the default + * {@code nifi.provenance.repository.encryption.key} value is returned. If a key ID is specified in + * {@code nifi.provenance.repository.encryption.key.id}, it will attempt to read from + * {@code nifi.provenance.repository.encryption.key.id.XYZ} where {@code XYZ} is the provided key + * ID. If that value is empty, it will use the default property + * {@code nifi.provenance.repository.encryption.key}. + * + * @return the provenance repository encryption key in hex form + */ + public String getProvenanceRepoEncryptionKey() { + String keyId = getProvenanceRepoEncryptionKeyId(); + String keyKey = StringUtils.isBlank(keyId) ? PROVENANCE_REPO_ENCRYPTION_KEY : PROVENANCE_REPO_ENCRYPTION_KEY + ".id." + keyId; + return getProperty(keyKey, getProperty(PROVENANCE_REPO_ENCRYPTION_KEY)); + } + + /** + * Returns a map of keyId -> key in hex loaded from the {@code nifi.properties} file if a + * {@code StaticKeyProvider} is defined. If {@code FileBasedKeyProvider} is defined, use + * {@code CryptoUtils#readKeys()} instead -- this method will return an empty map. + * + * @return a Map of the keys identified by key ID + */ + public Map getProvenanceRepoEncryptionKeys() { + Map keys = new HashMap<>(); + List keyProperties = getProvenanceRepositoryEncryptionKeyProperties(); + + // Retrieve the actual key values and store non-empty values in the map + for (String prop : keyProperties) { + final String value = getProperty(prop); + if (!StringUtils.isBlank(value)) { + if (prop.equalsIgnoreCase(PROVENANCE_REPO_ENCRYPTION_KEY)) { + prop = getProvenanceRepoEncryptionKeyId(); + } else { + // Extract nifi.provenance.repository.encryption.key.id.key1 -> key1 + prop = prop.substring(prop.lastIndexOf(".") + 1); + } + keys.put(prop, value); + } + } + return keys; + } + + /** + * Returns the whitelisted proxy hostnames (and IP addresses) as a comma-delimited string. + * The hosts have been normalized to the form {@code somehost.com}, {@code somehost.com:port}, or {@code 127.0.0.1}. + *

+ * Note: Calling {@code NiFiProperties.getProperty(NiFiProperties.WEB_PROXY_HOST)} will not normalize the hosts. + * + * @return the hostname(s) + */ + public String getWhitelistedHosts() { + return StringUtils.join(getWhitelistedHostsAsList(), ","); + } + + /** + * Returns the whitelisted proxy hostnames (and IP addresses) as a List. The hosts have been normalized to the form {@code somehost.com}, {@code somehost.com:port}, or {@code 127.0.0.1}. + * + * @return the hostname(s) + */ + public List getWhitelistedHostsAsList() { + String rawProperty = getProperty(WEB_PROXY_HOST, ""); + List hosts = Arrays.asList(rawProperty.split(",")); + return hosts.stream() + .map(this::normalizeHost).filter(host -> !StringUtils.isBlank(host)).collect(Collectors.toList()); + } + + String normalizeHost(String host) { + if (host == null || host.equalsIgnoreCase("")) { + return ""; + } else { + return host.trim(); + } + } + + /** + * Returns the whitelisted proxy context paths as a comma-delimited string. The paths have been normalized to the form {@code /some/context/path}. + *

+ * Note: Calling {@code NiFiProperties.getProperty(NiFiProperties.WEB_PROXY_CONTEXT_PATH)} will not normalize the paths. + * + * @return the path(s) + */ + public String getWhitelistedContextPaths() { + return StringUtils.join(getWhitelistedContextPathsAsList(), ","); + } + + /** + * Returns the whitelisted proxy context paths as a list of paths. The paths have been normalized to the form {@code /some/context/path}. + * + * @return the path(s) + */ + public List getWhitelistedContextPathsAsList() { + String rawProperty = getProperty(WEB_PROXY_CONTEXT_PATH, ""); + List contextPaths = Arrays.asList(rawProperty.split(",")); + return contextPaths.stream() + .map(this::normalizeContextPath).collect(Collectors.toList()); + } + + private String normalizeContextPath(String cp) { + if (cp == null || cp.equalsIgnoreCase("")) { + return ""; + } else { + String trimmedCP = cp.trim(); + // Ensure it starts with a leading slash and does not end in a trailing slash + // There's a potential for the path to be something like bad/path/// but this is semi-trusted data from an admin-accessible file and there are way worse possibilities here + trimmedCP = trimmedCP.startsWith("/") ? trimmedCP : "/" + trimmedCP; + trimmedCP = trimmedCP.endsWith("/") ? trimmedCP.substring(0, trimmedCP.length() - 1) : trimmedCP; + return trimmedCP; + } + } + + private List getProvenanceRepositoryEncryptionKeyProperties() { + // Filter all the property keys that define a key + return getPropertyKeys().stream().filter(k -> + k.startsWith(PROVENANCE_REPO_ENCRYPTION_KEY_ID + ".") || k.equalsIgnoreCase(PROVENANCE_REPO_ENCRYPTION_KEY) + ).collect(Collectors.toList()); + } + + public Long getDefaultBackPressureObjectThreshold() { + long backPressureCount; + try { + String backPressureCountStr = getProperty(BACKPRESSURE_COUNT); + if (backPressureCountStr == null || backPressureCountStr.trim().isEmpty()) { + backPressureCount = DEFAULT_BACKPRESSURE_COUNT; + } else { + backPressureCount = Long.parseLong(backPressureCountStr); + } + } catch (NumberFormatException nfe) { + backPressureCount = DEFAULT_BACKPRESSURE_COUNT; + } + return backPressureCount; + } + + public String getDefaultBackPressureDataSizeThreshold() { + return getProperty(BACKPRESSURE_SIZE, DEFAULT_BACKPRESSURE_SIZE); + } + + /** + * Creates an instance of NiFiProperties. This should likely not be called + * by any classes outside of the NiFi framework but can be useful by the + * framework for default property loading behavior or helpful in tests + * needing to create specific instances of NiFiProperties. If properties + * file specified cannot be found/read a runtime exception will be thrown. + * If one is not specified no properties will be loaded by default. + * + * @param propertiesFilePath if provided properties will be loaded from + * given file; else will be loaded from System property. Can be null. + * @param additionalProperties allows overriding of properties with the + * supplied values. these will be applied after loading from any properties + * file. Can be null or empty. + * @return NiFiProperties + */ + public static NiFiProperties createBasicNiFiProperties(final String propertiesFilePath, final Map additionalProperties) { + final Map addProps = (additionalProperties == null) ? Collections.EMPTY_MAP : additionalProperties; + final Properties properties = new Properties(); + final String nfPropertiesFilePath = (propertiesFilePath == null) + ? System.getProperty(NiFiProperties.PROPERTIES_FILE_PATH) + : propertiesFilePath; + if (nfPropertiesFilePath != null) { + final File propertiesFile = new File(nfPropertiesFilePath.trim()); + if (!propertiesFile.exists()) { + throw new RuntimeException("Properties file doesn't exist \'" + + propertiesFile.getAbsolutePath() + "\'"); + } + if (!propertiesFile.canRead()) { + throw new RuntimeException("Properties file exists but cannot be read \'" + + propertiesFile.getAbsolutePath() + "\'"); + } + InputStream inStream = null; + try { + inStream = new BufferedInputStream(new FileInputStream(propertiesFile)); + properties.load(inStream); + } catch (final Exception ex) { + throw new RuntimeException("Cannot load properties file due to " + + ex.getLocalizedMessage(), ex); + } finally { + if (null != inStream) { + try { + inStream.close(); + } catch (final Exception ex) { + /** + * do nothing * + */ + } + } + } + } + addProps.entrySet().stream().forEach((entry) -> { + properties.setProperty(entry.getKey(), entry.getValue()); + }); + return new NiFiProperties() { + @Override + public String getProperty(String key) { + return properties.getProperty(key); + } + + @Override + public Set getPropertyKeys() { + return properties.stringPropertyNames(); + } + }; + } + + /** + * This method is used to validate the NiFi properties when the file is loaded + * for the first time. The objective is to stop NiFi startup in case a property + * is not correctly configured and could cause issues afterwards. + */ + public void validate() { + // REMOTE_INPUT_HOST should be a valid hostname + String remoteInputHost = getProperty(REMOTE_INPUT_HOST); + if (!StringUtils.isBlank(remoteInputHost) && remoteInputHost.split(":").length > 1) { // no scheme/port needed here (http://) + throw new IllegalArgumentException(remoteInputHost + " is not a correct value for " + REMOTE_INPUT_HOST + ". It should be a valid hostname without protocol or port."); + } + // Other properties to validate... + } +} diff --git a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java new file mode 100644 index 0000000..8ad05bd --- /dev/null +++ b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -0,0 +1,4899 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modifications to the original nifi code for the ONAP project are made + * available under the Apache License, Version 2.0 + */ +package org.apache.nifi.web; + +import com.google.common.collect.Sets; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.nifi.action.Action; +import org.apache.nifi.action.Component; +import org.apache.nifi.action.FlowChangeAction; +import org.apache.nifi.action.Operation; +import org.apache.nifi.action.details.FlowChangePurgeDetails; +import org.apache.nifi.admin.service.AuditService; +import org.apache.nifi.authorization.AccessDeniedException; +import org.apache.nifi.authorization.AccessPolicy; +import org.apache.nifi.authorization.AuthorizableLookup; +import org.apache.nifi.authorization.AuthorizationRequest; +import org.apache.nifi.authorization.AuthorizationResult; +import org.apache.nifi.authorization.AuthorizationResult.Result; +import org.apache.nifi.authorization.AuthorizeAccess; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.Group; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.Resource; +import org.apache.nifi.authorization.User; +import org.apache.nifi.authorization.UserContextKeys; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.authorization.resource.EnforcePolicyPermissionsThroughBaseResource; +import org.apache.nifi.authorization.resource.OperationAuthorizable; +import org.apache.nifi.authorization.resource.ResourceFactory; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.authorization.user.NiFiUserUtils; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor; +import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat; +import org.apache.nifi.cluster.coordination.node.ClusterRoles; +import org.apache.nifi.cluster.coordination.node.DisconnectionCode; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.coordination.node.OffloadCode; +import org.apache.nifi.cluster.event.NodeEvent; +import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException; +import org.apache.nifi.cluster.manager.exception.UnknownNodeException; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.components.ConfigurableComponent; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.RequiredPermission; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.connectable.Funnel; +import org.apache.nifi.connectable.Port; +import org.apache.nifi.controller.ComponentNode; +import org.apache.nifi.controller.Counter; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.controller.Snippet; +import org.apache.nifi.controller.Template; +import org.apache.nifi.controller.label.Label; +import org.apache.nifi.controller.leader.election.LeaderElectionManager; +import org.apache.nifi.controller.repository.claim.ContentDirection; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceReference; +import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.diagnostics.SystemDiagnostics; +import org.apache.nifi.events.BulletinFactory; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.ProcessGroupCounts; +import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.history.History; +import org.apache.nifi.history.HistoryQuery; +import org.apache.nifi.history.PreviousValue; +import org.apache.nifi.registry.ComponentVariableRegistry; +import org.apache.nifi.registry.authorization.Permissions; +import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.client.NiFiRegistryException; +import org.apache.nifi.registry.flow.FlowRegistry; +import org.apache.nifi.registry.flow.FlowRegistryClient; +import org.apache.nifi.registry.flow.VersionControlInformation; +import org.apache.nifi.registry.flow.VersionedComponent; +import org.apache.nifi.registry.flow.VersionedConnection; +import org.apache.nifi.registry.flow.VersionedFlow; +import org.apache.nifi.registry.flow.VersionedFlowCoordinates; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; +import org.apache.nifi.registry.flow.VersionedFlowState; +import org.apache.nifi.registry.flow.VersionedProcessGroup; +import org.apache.nifi.registry.flow.diff.ComparableDataFlow; +import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor; +import org.apache.nifi.registry.flow.diff.DifferenceType; +import org.apache.nifi.registry.flow.diff.FlowComparator; +import org.apache.nifi.registry.flow.diff.FlowComparison; +import org.apache.nifi.registry.flow.diff.FlowDifference; +import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow; +import org.apache.nifi.registry.flow.diff.StandardFlowComparator; +import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPort; +import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; +import org.apache.nifi.remote.RemoteGroupPort; +import org.apache.nifi.remote.RootGroupPort; +import org.apache.nifi.reporting.Bulletin; +import org.apache.nifi.reporting.BulletinQuery; +import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.ComponentType; +import org.apache.nifi.util.BundleUtils; +import org.apache.nifi.util.FlowDifferenceFilters; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.api.dto.AccessPolicyDTO; +import org.apache.nifi.web.api.dto.AccessPolicySummaryDTO; +import org.apache.nifi.web.api.dto.AffectedComponentDTO; +import org.apache.nifi.web.api.dto.BucketDTO; +import org.apache.nifi.web.api.dto.BulletinBoardDTO; +import org.apache.nifi.web.api.dto.BulletinDTO; +import org.apache.nifi.web.api.dto.BulletinQueryDTO; +import org.apache.nifi.web.api.dto.BundleDTO; +import org.apache.nifi.web.api.dto.ClusterDTO; +import org.apache.nifi.web.api.dto.ComponentDTO; +import org.apache.nifi.web.api.dto.ComponentDifferenceDTO; +import org.apache.nifi.web.api.dto.ComponentHistoryDTO; +import org.apache.nifi.web.api.dto.ComponentReferenceDTO; +import org.apache.nifi.web.api.dto.ComponentRestrictionPermissionDTO; +import org.apache.nifi.web.api.dto.ComponentStateDTO; +import org.apache.nifi.web.api.dto.ConnectionDTO; +import org.apache.nifi.web.api.dto.ControllerConfigurationDTO; +import org.apache.nifi.web.api.dto.ControllerDTO; +import org.apache.nifi.web.api.dto.ControllerServiceDTO; +import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; +import org.apache.nifi.web.api.dto.CounterDTO; +import org.apache.nifi.web.api.dto.CountersDTO; +import org.apache.nifi.web.api.dto.CountersSnapshotDTO; +import org.apache.nifi.web.api.dto.DocumentedTypeDTO; +import org.apache.nifi.web.api.dto.DropRequestDTO; +import org.apache.nifi.web.api.dto.DtoFactory; +import org.apache.nifi.web.api.dto.EntityFactory; +import org.apache.nifi.web.api.dto.FlowConfigurationDTO; +import org.apache.nifi.web.api.dto.FlowFileDTO; +import org.apache.nifi.web.api.dto.FlowSnippetDTO; +import org.apache.nifi.web.api.dto.FunnelDTO; +import org.apache.nifi.web.api.dto.LabelDTO; +import org.apache.nifi.web.api.dto.ListingRequestDTO; +import org.apache.nifi.web.api.dto.NodeDTO; +import org.apache.nifi.web.api.dto.PermissionsDTO; +import org.apache.nifi.web.api.dto.PortDTO; +import org.apache.nifi.web.api.dto.PreviousValueDTO; +import org.apache.nifi.web.api.dto.ProcessGroupDTO; +import org.apache.nifi.web.api.dto.ProcessorConfigDTO; +import org.apache.nifi.web.api.dto.ProcessorDTO; +import org.apache.nifi.web.api.dto.PropertyDescriptorDTO; +import org.apache.nifi.web.api.dto.PropertyHistoryDTO; +import org.apache.nifi.web.api.dto.RegistryDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; +import org.apache.nifi.web.api.dto.ReportingTaskDTO; +import org.apache.nifi.web.api.dto.RequiredPermissionDTO; +import org.apache.nifi.web.api.dto.ResourceDTO; +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.dto.SnippetDTO; +import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO; +import org.apache.nifi.web.api.dto.TemplateDTO; +import org.apache.nifi.web.api.dto.UserDTO; +import org.apache.nifi.web.api.dto.UserGroupDTO; +import org.apache.nifi.web.api.dto.VariableRegistryDTO; +import org.apache.nifi.web.api.dto.VersionControlInformationDTO; +import org.apache.nifi.web.api.dto.VersionedFlowDTO; +import org.apache.nifi.web.api.dto.action.HistoryDTO; +import org.apache.nifi.web.api.dto.action.HistoryQueryDTO; +import org.apache.nifi.web.api.dto.diagnostics.ConnectionDiagnosticsDTO; +import org.apache.nifi.web.api.dto.diagnostics.ControllerServiceDiagnosticsDTO; +import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsDTO; +import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsSnapshotDTO; +import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO; +import org.apache.nifi.web.api.dto.flow.FlowDTO; +import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO; +import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; +import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO; +import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO; +import org.apache.nifi.web.api.dto.search.SearchResultsDTO; +import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; +import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; +import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.PortStatusDTO; +import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; +import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; +import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; +import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; +import org.apache.nifi.web.api.entity.AccessPolicyEntity; +import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity; +import org.apache.nifi.web.api.entity.ActionEntity; +import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity; +import org.apache.nifi.web.api.entity.AffectedComponentEntity; +import org.apache.nifi.web.api.entity.BucketEntity; +import org.apache.nifi.web.api.entity.BulletinEntity; +import org.apache.nifi.web.api.entity.ComponentReferenceEntity; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.ConnectionStatusEntity; +import org.apache.nifi.web.api.entity.ControllerBulletinsEntity; +import org.apache.nifi.web.api.entity.ControllerConfigurationEntity; +import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity; +import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; +import org.apache.nifi.web.api.entity.CurrentUserEntity; +import org.apache.nifi.web.api.entity.FlowComparisonEntity; +import org.apache.nifi.web.api.entity.FlowConfigurationEntity; +import org.apache.nifi.web.api.entity.FlowEntity; +import org.apache.nifi.web.api.entity.FunnelEntity; +import org.apache.nifi.web.api.entity.LabelEntity; +import org.apache.nifi.web.api.entity.PortEntity; +import org.apache.nifi.web.api.entity.PortStatusEntity; +import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity; +import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; +import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity; +import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.apache.nifi.web.api.entity.ProcessorStatusEntity; +import org.apache.nifi.web.api.entity.RegistryClientEntity; +import org.apache.nifi.web.api.entity.RegistryEntity; +import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; +import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity; +import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity; +import org.apache.nifi.web.api.entity.ReportingTaskEntity; +import org.apache.nifi.web.api.entity.ScheduleComponentsEntity; +import org.apache.nifi.web.api.entity.SnippetEntity; +import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity; +import org.apache.nifi.web.api.entity.StatusHistoryEntity; +import org.apache.nifi.web.api.entity.TemplateEntity; +import org.apache.nifi.web.api.entity.TenantEntity; +import org.apache.nifi.web.api.entity.UserEntity; +import org.apache.nifi.web.api.entity.UserGroupEntity; +import org.apache.nifi.web.api.entity.VariableEntity; +import org.apache.nifi.web.api.entity.VariableRegistryEntity; +import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity; +import org.apache.nifi.web.api.entity.VersionControlInformationEntity; +import org.apache.nifi.web.api.entity.VersionedFlowEntity; +import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity; +import org.apache.nifi.web.controller.ControllerFacade; +import org.apache.nifi.web.dao.AccessPolicyDAO; +import org.apache.nifi.web.dao.ConnectionDAO; +import org.apache.nifi.web.dao.ControllerServiceDAO; +import org.apache.nifi.web.dao.FunnelDAO; +import org.apache.nifi.web.dao.LabelDAO; +import org.apache.nifi.web.dao.PortDAO; +import org.apache.nifi.web.dao.ProcessGroupDAO; +import org.apache.nifi.web.dao.ProcessorDAO; +import org.apache.nifi.web.dao.RegistryDAO; +import org.apache.nifi.web.dao.RemoteProcessGroupDAO; +import org.apache.nifi.web.dao.ReportingTaskDAO; +import org.apache.nifi.web.dao.SnippetDAO; +import org.apache.nifi.web.dao.TemplateDAO; +import org.apache.nifi.web.dao.UserDAO; +import org.apache.nifi.web.dao.UserGroupDAO; +import org.apache.nifi.web.revision.DeleteRevisionTask; +import org.apache.nifi.web.revision.ExpiredRevisionClaimException; +import org.apache.nifi.web.revision.RevisionClaim; +import org.apache.nifi.web.revision.RevisionManager; +import org.apache.nifi.web.revision.RevisionUpdate; +import org.apache.nifi.web.revision.StandardRevisionClaim; +import org.apache.nifi.web.revision.StandardRevisionUpdate; +import org.apache.nifi.web.revision.UpdateRevisionTask; +import org.apache.nifi.web.util.SnippetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Implementation of NiFiServiceFacade that performs revision checking. + */ +public class StandardNiFiServiceFacade implements NiFiServiceFacade { + private static final Logger logger = LoggerFactory.getLogger(StandardNiFiServiceFacade.class); + private static final int VALIDATION_WAIT_MILLIS = 50; + + // nifi core components + private ControllerFacade controllerFacade; + private SnippetUtils snippetUtils; + + // revision manager + private RevisionManager revisionManager; + private BulletinRepository bulletinRepository; + + // data access objects + private ProcessorDAO processorDAO; + private ProcessGroupDAO processGroupDAO; + private RemoteProcessGroupDAO remoteProcessGroupDAO; + private LabelDAO labelDAO; + private FunnelDAO funnelDAO; + private SnippetDAO snippetDAO; + private PortDAO inputPortDAO; + private PortDAO outputPortDAO; + private ConnectionDAO connectionDAO; + private ControllerServiceDAO controllerServiceDAO; + private ReportingTaskDAO reportingTaskDAO; + private TemplateDAO templateDAO; + private UserDAO userDAO; + private UserGroupDAO userGroupDAO; + private AccessPolicyDAO accessPolicyDAO; + private RegistryDAO registryDAO; + private ClusterCoordinator clusterCoordinator; + private HeartbeatMonitor heartbeatMonitor; + private LeaderElectionManager leaderElectionManager; + + // administrative services + private AuditService auditService; + + // flow registry + private FlowRegistryClient flowRegistryClient; + + // properties + private NiFiProperties properties; + private DtoFactory dtoFactory; + private EntityFactory entityFactory; + + private Authorizer authorizer; + + private AuthorizableLookup authorizableLookup; + + // ----------------------------------------- + // Synchronization methods + // ----------------------------------------- + @Override + public void authorizeAccess(final AuthorizeAccess authorizeAccess) { + authorizeAccess.authorize(authorizableLookup); + } + + @Override + public void verifyRevision(final Revision revision, final NiFiUser user) { + final Revision curRevision = revisionManager.getRevision(revision.getComponentId()); + if (revision.equals(curRevision)) { + return; + } + + throw new InvalidRevisionException(revision + " is not the most up-to-date revision. This component appears to have been modified"); + } + + @Override + public void verifyRevisions(final Set revisions, final NiFiUser user) { + for (final Revision revision : revisions) { + verifyRevision(revision, user); + } + } + + @Override + public Set getRevisionsFromGroup(final String groupId, final Function> getComponents) { + final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); + final Set componentIds = getComponents.apply(group); + return componentIds.stream().map(id -> revisionManager.getRevision(id)).collect(Collectors.toSet()); + } + + @Override + public Set getRevisionsFromSnippet(final String snippetId) { + final Snippet snippet = snippetDAO.getSnippet(snippetId); + final Set componentIds = new HashSet<>(); + componentIds.addAll(snippet.getProcessors().keySet()); + componentIds.addAll(snippet.getFunnels().keySet()); + componentIds.addAll(snippet.getLabels().keySet()); + componentIds.addAll(snippet.getConnections().keySet()); + componentIds.addAll(snippet.getInputPorts().keySet()); + componentIds.addAll(snippet.getOutputPorts().keySet()); + componentIds.addAll(snippet.getProcessGroups().keySet()); + componentIds.addAll(snippet.getRemoteProcessGroups().keySet()); + return componentIds.stream().map(id -> revisionManager.getRevision(id)).collect(Collectors.toSet()); + } + + // ----------------------------------------- + // Verification Operations + // ----------------------------------------- + + @Override + public void verifyListQueue(final String connectionId) { + connectionDAO.verifyList(connectionId); + } + + @Override + public void verifyCreateConnection(final String groupId, final ConnectionDTO connectionDTO) { + connectionDAO.verifyCreate(groupId, connectionDTO); + } + + @Override + public void verifyUpdateConnection(final ConnectionDTO connectionDTO) { + // if connection does not exist, then the update request is likely creating it + // so we don't verify since it will fail + if (connectionDAO.hasConnection(connectionDTO.getId())) { + connectionDAO.verifyUpdate(connectionDTO); + } else { + connectionDAO.verifyCreate(connectionDTO.getParentGroupId(), connectionDTO); + } + } + + @Override + public void verifyDeleteConnection(final String connectionId) { + connectionDAO.verifyDelete(connectionId); + } + + @Override + public void verifyDeleteFunnel(final String funnelId) { + funnelDAO.verifyDelete(funnelId); + } + + @Override + public void verifyUpdateInputPort(final PortDTO inputPortDTO) { + // if connection does not exist, then the update request is likely creating it + // so we don't verify since it will fail + if (inputPortDAO.hasPort(inputPortDTO.getId())) { + inputPortDAO.verifyUpdate(inputPortDTO); + } + } + + @Override + public void verifyDeleteInputPort(final String inputPortId) { + inputPortDAO.verifyDelete(inputPortId); + } + + @Override + public void verifyUpdateOutputPort(final PortDTO outputPortDTO) { + // if connection does not exist, then the update request is likely creating it + // so we don't verify since it will fail + if (outputPortDAO.hasPort(outputPortDTO.getId())) { + outputPortDAO.verifyUpdate(outputPortDTO); + } + } + + @Override + public void verifyDeleteOutputPort(final String outputPortId) { + outputPortDAO.verifyDelete(outputPortId); + } + + @Override + public void verifyCreateProcessor(ProcessorDTO processorDTO) { + processorDAO.verifyCreate(processorDTO); + } + + @Override + public void verifyUpdateProcessor(final ProcessorDTO processorDTO) { + // if group does not exist, then the update request is likely creating it + // so we don't verify since it will fail + if (processorDAO.hasProcessor(processorDTO.getId())) { + processorDAO.verifyUpdate(processorDTO); + } else { + verifyCreateProcessor(processorDTO); + } + } + + @Override + public void verifyDeleteProcessor(final String processorId) { + processorDAO.verifyDelete(processorId); + } + + @Override + public void verifyScheduleComponents(final String groupId, final ScheduledState state, final Set componentIds) { + processGroupDAO.verifyScheduleComponents(groupId, state, componentIds); + } + + @Override + public void verifyEnableComponents(String processGroupId, ScheduledState state, Set componentIds) { + processGroupDAO.verifyEnableComponents(processGroupId, state, componentIds); + } + + @Override + public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Collection serviceIds) { + processGroupDAO.verifyActivateControllerServices(state, serviceIds); + } + + @Override + public void verifyDeleteProcessGroup(final String groupId) { + processGroupDAO.verifyDelete(groupId); + } + + @Override + public void verifyUpdateRemoteProcessGroup(final RemoteProcessGroupDTO remoteProcessGroupDTO) { + // if remote group does not exist, then the update request is likely creating it + // so we don't verify since it will fail + if (remoteProcessGroupDAO.hasRemoteProcessGroup(remoteProcessGroupDTO.getId())) { + remoteProcessGroupDAO.verifyUpdate(remoteProcessGroupDTO); + } + } + + @Override + public void verifyUpdateRemoteProcessGroupInputPort(final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { + remoteProcessGroupDAO.verifyUpdateInputPort(remoteProcessGroupId, remoteProcessGroupPortDTO); + } + + @Override + public void verifyUpdateRemoteProcessGroupOutputPort(final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { + remoteProcessGroupDAO.verifyUpdateOutputPort(remoteProcessGroupId, remoteProcessGroupPortDTO); + } + + @Override + public void verifyDeleteRemoteProcessGroup(final String remoteProcessGroupId) { + remoteProcessGroupDAO.verifyDelete(remoteProcessGroupId); + } + + @Override + public void verifyCreateControllerService(ControllerServiceDTO controllerServiceDTO) { + controllerServiceDAO.verifyCreate(controllerServiceDTO); + } + + @Override + public void verifyUpdateControllerService(final ControllerServiceDTO controllerServiceDTO) { + // if service does not exist, then the update request is likely creating it + // so we don't verify since it will fail + if (controllerServiceDAO.hasControllerService(controllerServiceDTO.getId())) { + controllerServiceDAO.verifyUpdate(controllerServiceDTO); + } else { + verifyCreateControllerService(controllerServiceDTO); + } + } + + @Override + public void verifyUpdateControllerServiceReferencingComponents(final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) { + controllerServiceDAO.verifyUpdateReferencingComponents(controllerServiceId, scheduledState, controllerServiceState); + } + + @Override + public void verifyDeleteControllerService(final String controllerServiceId) { + controllerServiceDAO.verifyDelete(controllerServiceId); + } + + @Override + public void verifyCreateReportingTask(ReportingTaskDTO reportingTaskDTO) { + reportingTaskDAO.verifyCreate(reportingTaskDTO); + } + + @Override + public void verifyUpdateReportingTask(final ReportingTaskDTO reportingTaskDTO) { + // if tasks does not exist, then the update request is likely creating it + // so we don't verify since it will fail + if (reportingTaskDAO.hasReportingTask(reportingTaskDTO.getId())) { + reportingTaskDAO.verifyUpdate(reportingTaskDTO); + } else { + verifyCreateReportingTask(reportingTaskDTO); + } + } + + @Override + public void verifyDeleteReportingTask(final String reportingTaskId) { + reportingTaskDAO.verifyDelete(reportingTaskId); + } + + // ----------------------------------------- + // Write Operations + // ----------------------------------------- + + @Override + public AccessPolicyEntity updateAccessPolicy(final Revision revision, final AccessPolicyDTO accessPolicyDTO) { + final Authorizable authorizable = authorizableLookup.getAccessPolicyById(accessPolicyDTO.getId()); + final RevisionUpdate snapshot = updateComponent(revision, + authorizable, + () -> accessPolicyDAO.updateAccessPolicy(accessPolicyDTO), + accessPolicy -> { + final Set users = accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()); + final Set userGroups = accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()); + final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource()); + return dtoFactory.createAccessPolicyDto(accessPolicy, userGroups, users, componentReference); + }); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizable); + return entityFactory.createAccessPolicyEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); + } + + @Override + public UserEntity updateUser(final Revision revision, final UserDTO userDTO) { + final Authorizable usersAuthorizable = authorizableLookup.getTenant(); + final Set groups = userGroupDAO.getUserGroupsForUser(userDTO.getId()); + final Set policies = userGroupDAO.getAccessPoliciesForUser(userDTO.getId()); + final RevisionUpdate snapshot = updateComponent(revision, + usersAuthorizable, + () -> userDAO.updateUser(userDTO), + user -> { + final Set tenantEntities = groups.stream().map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()); + final Set policyEntities = policies.stream().map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); + return dtoFactory.createUserDto(user, tenantEntities, policyEntities); + }); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(usersAuthorizable); + return entityFactory.createUserEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); + } + + @Override + public UserGroupEntity updateUserGroup(final Revision revision, final UserGroupDTO userGroupDTO) { + final Authorizable userGroupsAuthorizable = authorizableLookup.getTenant(); + final Set policies = userGroupDAO.getAccessPoliciesForUserGroup(userGroupDTO.getId()); + final RevisionUpdate snapshot = updateComponent(revision, + userGroupsAuthorizable, + () -> userGroupDAO.updateUserGroup(userGroupDTO), + userGroup -> { + final Set tenantEntities = userGroup.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()); + final Set policyEntities = policies.stream().map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); + return dtoFactory.createUserGroupDto(userGroup, tenantEntities, policyEntities); + } + ); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(userGroupsAuthorizable); + return entityFactory.createUserGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); + } + + @Override + public ConnectionEntity updateConnection(final Revision revision, final ConnectionDTO connectionDTO) { + final Connection connectionNode = connectionDAO.getConnection(connectionDTO.getId()); + + final RevisionUpdate snapshot = updateComponent( + revision, + connectionNode, + () -> connectionDAO.updateConnection(connectionDTO), + connection -> dtoFactory.createConnectionDto(connection)); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connectionNode); + final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionNode.getIdentifier())); + return entityFactory.createConnectionEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status); + } + + @Override + public ProcessorEntity updateProcessor(final Revision revision, final ProcessorDTO processorDTO) { + // get the component, ensure we have access to it, and perform the update request + final ProcessorNode processorNode = processorDAO.getProcessor(processorDTO.getId()); + final RevisionUpdate snapshot = updateComponent(revision, + processorNode, + () -> processorDAO.updateProcessor(processorDTO), + proc -> { + awaitValidationCompletion(proc); + return dtoFactory.createProcessorDto(proc); + }); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processorNode); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processorNode)); + final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processorNode.getIdentifier())); + final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processorNode.getIdentifier())); + final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); + return entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); + } + + private void awaitValidationCompletion(final ComponentNode component) { + component.getValidationStatus(VALIDATION_WAIT_MILLIS, TimeUnit.MILLISECONDS); + } + + @Override + public LabelEntity updateLabel(final Revision revision, final LabelDTO labelDTO) { + final Label labelNode = labelDAO.getLabel(labelDTO.getId()); + final RevisionUpdate snapshot = updateComponent(revision, + labelNode, + () -> labelDAO.updateLabel(labelDTO), + label -> dtoFactory.createLabelDto(label)); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(labelNode); + return entityFactory.createLabelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); + } + + @Override + public FunnelEntity updateFunnel(final Revision revision, final FunnelDTO funnelDTO) { + final Funnel funnelNode = funnelDAO.getFunnel(funnelDTO.getId()); + final RevisionUpdate snapshot = updateComponent(revision, + funnelNode, + () -> funnelDAO.updateFunnel(funnelDTO), + funnel -> dtoFactory.createFunnelDto(funnel)); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnelNode); + return entityFactory.createFunnelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); + } + + + /** + * Updates a component with the given revision, using the provided supplier to call + * into the appropriate DAO and the provided function to convert the component into a DTO. + * + * @param revision the current revision + * @param daoUpdate a Supplier that will update the component via the appropriate DAO + * @param dtoCreation a Function to convert a component into a dao + * @param the DTO Type of the updated component + * @param the Component Type of the updated component + * @return A RevisionUpdate that represents the new configuration + */ + private RevisionUpdate updateComponent(final Revision revision, final Authorizable authorizable, final Supplier daoUpdate, final Function dtoCreation) { + try { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + final RevisionUpdate updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(revision), user, new UpdateRevisionTask() { + @Override + public RevisionUpdate update() { + // get the updated component + final C component = daoUpdate.get(); + + // save updated controller + controllerFacade.save(); + + final D dto = dtoCreation.apply(component); + + final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId()); + final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity()); + return new StandardRevisionUpdate<>(dto, lastModification); + } + }); + + return updatedComponent; + } catch (final ExpiredRevisionClaimException erce) { + throw new InvalidRevisionException("Failed to update component " + authorizable, erce); + } + } + + + @Override + public void verifyUpdateSnippet(final SnippetDTO snippetDto, final Set affectedComponentIds) { + // if snippet does not exist, then the update request is likely creating it + // so we don't verify since it will fail + if (snippetDAO.hasSnippet(snippetDto.getId())) { + snippetDAO.verifyUpdateSnippetComponent(snippetDto); + } + } + + @Override + public SnippetEntity updateSnippet(final Set revisions, final SnippetDTO snippetDto) { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final RevisionClaim revisionClaim = new StandardRevisionClaim(revisions); + + final RevisionUpdate snapshot; + try { + snapshot = revisionManager.updateRevision(revisionClaim, user, new UpdateRevisionTask() { + @Override + public RevisionUpdate update() { + // get the updated component + final Snippet snippet = snippetDAO.updateSnippetComponents(snippetDto); + + // drop the snippet + snippetDAO.dropSnippet(snippet.getId()); + + // save updated controller + controllerFacade.save(); + + // increment the revisions + final Set updatedRevisions = revisions.stream().map(revision -> { + final Revision currentRevision = revisionManager.getRevision(revision.getComponentId()); + return currentRevision.incrementRevision(revision.getClientId()); + }).collect(Collectors.toSet()); + + final SnippetDTO dto = dtoFactory.createSnippetDto(snippet); + return new StandardRevisionUpdate<>(dto, null, updatedRevisions); + } + }); + } catch (final ExpiredRevisionClaimException e) { + throw new InvalidRevisionException("Failed to update Snippet", e); + } + + return entityFactory.createSnippetEntity(snapshot.getComponent()); + } + + @Override + public PortEntity updateInputPort(final Revision revision, final PortDTO inputPortDTO) { + final Port inputPortNode = inputPortDAO.getPort(inputPortDTO.getId()); + final RevisionUpdate snapshot = updateComponent(revision, + inputPortNode, + () -> inputPortDAO.updatePort(inputPortDTO), + port -> dtoFactory.createPortDto(port)); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(inputPortNode); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(inputPortNode)); + final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(inputPortNode.getIdentifier())); + final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(inputPortNode.getIdentifier())); + final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); + return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); + } + + @Override + public PortEntity updateOutputPort(final Revision revision, final PortDTO outputPortDTO) { + final Port outputPortNode = outputPortDAO.getPort(outputPortDTO.getId()); + final RevisionUpdate snapshot = updateComponent(revision, + outputPortNode, + () -> outputPortDAO.updatePort(outputPortDTO), + port -> dtoFactory.createPortDto(port)); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(outputPortNode); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(outputPortNode), NiFiUserUtils.getNiFiUser()); + final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(outputPortNode.getIdentifier())); + final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(outputPortNode.getIdentifier())); + final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); + return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); + } + + @Override + public RemoteProcessGroupEntity updateRemoteProcessGroup(final Revision revision, final RemoteProcessGroupDTO remoteProcessGroupDTO) { + final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId()); + final RevisionUpdate snapshot = updateComponent( + revision, + remoteProcessGroupNode, + () -> remoteProcessGroupDAO.updateRemoteProcessGroup(remoteProcessGroupDTO), + remoteProcessGroup -> dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup)); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode)); + final RevisionDTO updateRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); + final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroupNode, + controllerFacade.getRemoteProcessGroupStatus(remoteProcessGroupNode.getIdentifier())); + final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(remoteProcessGroupNode.getIdentifier())); + final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); + return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), updateRevision, permissions, operatePermissions, status, bulletinEntities); + } + + @Override + public RemoteProcessGroupPortEntity updateRemoteProcessGroupInputPort( + final Revision revision, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { + + final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupPortDTO.getGroupId()); + final RevisionUpdate snapshot = updateComponent( + revision, + remoteProcessGroupNode, + () -> remoteProcessGroupDAO.updateRemoteProcessGroupInputPort(remoteProcessGroupId, remoteProcessGroupPortDTO), + remoteGroupPort -> dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort)); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode)); + final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); + return entityFactory.createRemoteProcessGroupPortEntity(snapshot.getComponent(), updatedRevision, permissions, operatePermissions); + } + + @Override + public RemoteProcessGroupPortEntity updateRemoteProcessGroupOutputPort( + final Revision revision, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { + + final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupPortDTO.getGroupId()); + final RevisionUpdate snapshot = updateComponent( + revision, + remoteProcessGroupNode, + () -> remoteProcessGroupDAO.updateRemoteProcessGroupOutputPort(remoteProcessGroupId, remoteProcessGroupPortDTO), + remoteGroupPort -> dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort)); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode)); + final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); + return entityFactory.createRemoteProcessGroupPortEntity(snapshot.getComponent(), updatedRevision, permissions, operatePermissions); + } + + @Override + public Set getActiveComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO variableRegistryDto) { + final ProcessGroup group = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId()); + if (group == null) { + throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistryDto.getProcessGroupId()); + } + + final Map variableMap = new HashMap<>(); + variableRegistryDto.getVariables().stream() // have to use forEach here instead of using Collectors.toMap because value may be null + .map(VariableEntity::getVariable) + .forEach(var -> variableMap.put(var.getName(), var.getValue())); + + final Set affectedComponentDtos = new HashSet<>(); + + final Set updatedVariableNames = getUpdatedVariables(group, variableMap); + for (final String variableName : updatedVariableNames) { + final Set affectedComponents = group.getComponentsAffectedByVariable(variableName); + + for (final ComponentNode component : affectedComponents) { + if (component instanceof ProcessorNode) { + final ProcessorNode procNode = (ProcessorNode) component; + if (procNode.isRunning()) { + affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(procNode)); + } + } else if (component instanceof ControllerServiceNode) { + final ControllerServiceNode serviceNode = (ControllerServiceNode) component; + if (serviceNode.isActive()) { + affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(serviceNode)); + } + } else { + throw new RuntimeException("Found unexpected type of Component [" + component.getCanonicalClassName() + "] dependending on variable"); + } + } + } + + return affectedComponentDtos; + } + + @Override + public Set getComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO variableRegistryDto) { + final ProcessGroup group = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId()); + if (group == null) { + throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistryDto.getProcessGroupId()); + } + + final Map variableMap = new HashMap<>(); + variableRegistryDto.getVariables().stream() // have to use forEach here instead of using Collectors.toMap because value may be null + .map(VariableEntity::getVariable) + .forEach(var -> variableMap.put(var.getName(), var.getValue())); + + final Set affectedComponentEntities = new HashSet<>(); + + final Set updatedVariableNames = getUpdatedVariables(group, variableMap); + for (final String variableName : updatedVariableNames) { + final Set affectedComponents = group.getComponentsAffectedByVariable(variableName); + affectedComponentEntities.addAll(dtoFactory.createAffectedComponentEntities(affectedComponents, revisionManager)); + } + + return affectedComponentEntities; + } + + private Set getUpdatedVariables(final ProcessGroup group, final Map newVariableValues) { + final Set updatedVariableNames = new HashSet<>(); + + final ComponentVariableRegistry registry = group.getVariableRegistry(); + for (final Map.Entry entry : newVariableValues.entrySet()) { + final String varName = entry.getKey(); + final String newValue = entry.getValue(); + + final String curValue = registry.getVariableValue(varName); + if (!Objects.equals(newValue, curValue)) { + updatedVariableNames.add(varName); + } + } + + return updatedVariableNames; + } + + + @Override + public VariableRegistryEntity updateVariableRegistry(Revision revision, VariableRegistryDTO variableRegistryDto) { + final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId()); + final RevisionUpdate snapshot = updateComponent(revision, + processGroupNode, + () -> processGroupDAO.updateVariableRegistry(variableRegistryDto), + processGroup -> dtoFactory.createVariableRegistryDto(processGroup, revisionManager)); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode); + final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); + return entityFactory.createVariableRegistryEntity(snapshot.getComponent(), updatedRevision, permissions); + } + + + @Override + public ProcessGroupEntity updateProcessGroup(final Revision revision, final ProcessGroupDTO processGroupDTO) { + final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(processGroupDTO.getId()); + final RevisionUpdate snapshot = updateComponent(revision, + processGroupNode, + () -> processGroupDAO.updateProcessGroup(processGroupDTO), + processGroup -> dtoFactory.createProcessGroupDto(processGroup)); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode); + final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); + final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroupNode.getIdentifier())); + final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroupNode.getIdentifier())); + final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); + return entityFactory.createProcessGroupEntity(snapshot.getComponent(), updatedRevision, permissions, status, bulletinEntities); + } + + @Override + public void verifyUpdateProcessGroup(ProcessGroupDTO processGroupDTO) { + if (processGroupDAO.hasProcessGroup(processGroupDTO.getId())) { + processGroupDAO.verifyUpdate(processGroupDTO); + } + } + + @Override + public ScheduleComponentsEntity enableComponents(String processGroupId, ScheduledState state, Map componentRevisions) { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + final RevisionUpdate updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new + UpdateRevisionTask() { + @Override + public RevisionUpdate update() { + // schedule the components + processGroupDAO.enableComponents(processGroupId, state, componentRevisions.keySet()); + + // update the revisions + final Map updatedRevisions = new HashMap<>(); + for (final Revision revision : componentRevisions.values()) { + final Revision currentRevision = revisionManager.getRevision(revision.getComponentId()); + updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId())); + } + + // save + controllerFacade.save(); + + // gather details for response + final ScheduleComponentsEntity entity = new ScheduleComponentsEntity(); + entity.setId(processGroupId); + entity.setState(state.name()); + return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values())); + } + }); + + return updatedComponent.getComponent(); + } + + @Override + public ScheduleComponentsEntity scheduleComponents(final String processGroupId, final ScheduledState state, final Map componentRevisions) { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final RevisionUpdate updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new + UpdateRevisionTask() { + @Override + public RevisionUpdate update() { + // schedule the components + processGroupDAO.scheduleComponents(processGroupId, state, componentRevisions.keySet()); + + // update the revisions + final Map updatedRevisions = new HashMap<>(); + for (final Revision revision : componentRevisions.values()) { + final Revision currentRevision = revisionManager.getRevision(revision.getComponentId()); + updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId())); + } + + // save + controllerFacade.save(); + + // gather details for response + final ScheduleComponentsEntity entity = new ScheduleComponentsEntity(); + entity.setId(processGroupId); + entity.setState(state.name()); + return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values())); + } + }); + + return updatedComponent.getComponent(); + } + + @Override + public ActivateControllerServicesEntity activateControllerServices(final String processGroupId, final ControllerServiceState state, final Map serviceRevisions) { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final RevisionUpdate updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(serviceRevisions.values()), user, + new UpdateRevisionTask() { + @Override + public RevisionUpdate update() { + // schedule the components + processGroupDAO.activateControllerServices(processGroupId, state, serviceRevisions.keySet()); + + // update the revisions + final Map updatedRevisions = new HashMap<>(); + for (final Revision revision : serviceRevisions.values()) { + final Revision currentRevision = revisionManager.getRevision(revision.getComponentId()); + updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId())); + } + + // save + controllerFacade.save(); + + // gather details for response + final ActivateControllerServicesEntity entity = new ActivateControllerServicesEntity(); + entity.setId(processGroupId); + entity.setState(state.name()); + return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values())); + } + }); + + return updatedComponent.getComponent(); + } + + + @Override + public ControllerConfigurationEntity updateControllerConfiguration(final Revision revision, final ControllerConfigurationDTO controllerConfigurationDTO) { + final RevisionUpdate updatedComponent = updateComponent( + revision, + controllerFacade, + () -> { + if (controllerConfigurationDTO.getMaxTimerDrivenThreadCount() != null) { + controllerFacade.setMaxTimerDrivenThreadCount(controllerConfigurationDTO.getMaxTimerDrivenThreadCount()); + } + if (controllerConfigurationDTO.getMaxEventDrivenThreadCount() != null) { + controllerFacade.setMaxEventDrivenThreadCount(controllerConfigurationDTO.getMaxEventDrivenThreadCount()); + } + + return controllerConfigurationDTO; + }, + controller -> dtoFactory.createControllerConfigurationDto(controllerFacade)); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerFacade); + final RevisionDTO updateRevision = dtoFactory.createRevisionDTO(updatedComponent.getLastModification()); + return entityFactory.createControllerConfigurationEntity(updatedComponent.getComponent(), updateRevision, permissions); + } + + + @Override + public NodeDTO updateNode(final NodeDTO nodeDTO) { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + if (user == null) { + throw new WebApplicationException(new Throwable("Unable to access details for current user.")); + } + final String userDn = user.getIdentity(); + + final NodeIdentifier nodeId = clusterCoordinator.getNodeIdentifier(nodeDTO.getNodeId()); + if (nodeId == null) { + throw new UnknownNodeException("No node exists with ID " + nodeDTO.getNodeId()); + } + + + if (NodeConnectionState.CONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) { + clusterCoordinator.requestNodeConnect(nodeId, userDn); + } else if (NodeConnectionState.OFFLOADING.name().equalsIgnoreCase(nodeDTO.getStatus())) { + clusterCoordinator.requestNodeOffload(nodeId, OffloadCode.OFFLOADED, + "User " + userDn + " requested that node be offloaded"); + } else if (NodeConnectionState.DISCONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) { + clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.USER_DISCONNECTED, + "User " + userDn + " requested that node be disconnected from cluster"); + } + + return getNode(nodeId); + } + + @Override + public CounterDTO updateCounter(final String counterId) { + return dtoFactory.createCounterDto(controllerFacade.resetCounter(counterId)); + } + + @Override + public void verifyCanClearProcessorState(final String processorId) { + processorDAO.verifyClearState(processorId); + } + + @Override + public void clearProcessorState(final String processorId) { + processorDAO.clearState(processorId); + } + + @Override + public void verifyCanClearControllerServiceState(final String controllerServiceId) { + controllerServiceDAO.verifyClearState(controllerServiceId); + } + + @Override + public void clearControllerServiceState(final String controllerServiceId) { + controllerServiceDAO.clearState(controllerServiceId); + } + + @Override + public void verifyCanClearReportingTaskState(final String reportingTaskId) { + reportingTaskDAO.verifyClearState(reportingTaskId); + } + + @Override + public void clearReportingTaskState(final String reportingTaskId) { + reportingTaskDAO.clearState(reportingTaskId); + } + + @Override + public ConnectionEntity deleteConnection(final Revision revision, final String connectionId) { + final Connection connection = connectionDAO.getConnection(connectionId); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection); + final ConnectionDTO snapshot = deleteComponent( + revision, + connection.getResource(), + () -> connectionDAO.deleteConnection(connectionId), + false, // no policies to remove + dtoFactory.createConnectionDto(connection)); + + return entityFactory.createConnectionEntity(snapshot, null, permissions, null); + } + + @Override + public DropRequestDTO deleteFlowFileDropRequest(final String connectionId, final String dropRequestId) { + return dtoFactory.createDropRequestDTO(connectionDAO.deleteFlowFileDropRequest(connectionId, dropRequestId)); + } + + @Override + public ListingRequestDTO deleteFlowFileListingRequest(final String connectionId, final String listingRequestId) { + final Connection connection = connectionDAO.getConnection(connectionId); + final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.deleteFlowFileListingRequest(connectionId, listingRequestId)); + + // include whether the source and destination are running + if (connection.getSource() != null) { + listRequest.setSourceRunning(connection.getSource().isRunning()); + } + if (connection.getDestination() != null) { + listRequest.setDestinationRunning(connection.getDestination().isRunning()); + } + + return listRequest; + } + + @Override + public ProcessorEntity deleteProcessor(final Revision revision, final String processorId) { + final ProcessorNode processor = processorDAO.getProcessor(processorId); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor)); + final ProcessorDTO snapshot = deleteComponent( + revision, + processor.getResource(), + () -> processorDAO.deleteProcessor(processorId), + true, + dtoFactory.createProcessorDto(processor)); + + return entityFactory.createProcessorEntity(snapshot, null, permissions, operatePermissions, null, null); + } + + @Override + public ProcessorEntity terminateProcessor(final String processorId) { + processorDAO.terminate(processorId); + return getProcessor(processorId); + } + + @Override + public void verifyTerminateProcessor(final String processorId) { + processorDAO.verifyTerminate(processorId); + } + + @Override + public LabelEntity deleteLabel(final Revision revision, final String labelId) { + final Label label = labelDAO.getLabel(labelId); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label); + final LabelDTO snapshot = deleteComponent( + revision, + label.getResource(), + () -> labelDAO.deleteLabel(labelId), + true, + dtoFactory.createLabelDto(label)); + + return entityFactory.createLabelEntity(snapshot, null, permissions); + } + + @Override + public UserEntity deleteUser(final Revision revision, final String userId) { + final User user = userDAO.getUser(userId); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); + final Set userGroups = user != null ? userGroupDAO.getUserGroupsForUser(userId).stream() + .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()) : null; + final Set policyEntities = user != null ? userGroupDAO.getAccessPoliciesForUser(userId).stream() + .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()) : null; + + final String resourceIdentifier = ResourceFactory.getTenantResource().getIdentifier() + "/" + userId; + final UserDTO snapshot = deleteComponent( + revision, + new Resource() { + @Override + public String getIdentifier() { + return resourceIdentifier; + } + + @Override + public String getName() { + return resourceIdentifier; + } + + @Override + public String getSafeDescription() { + return "User " + userId; + } + }, + () -> userDAO.deleteUser(userId), + false, // no user specific policies to remove + dtoFactory.createUserDto(user, userGroups, policyEntities)); + + return entityFactory.createUserEntity(snapshot, null, permissions); + } + + @Override + public UserGroupEntity deleteUserGroup(final Revision revision, final String userGroupId) { + final Group userGroup = userGroupDAO.getUserGroup(userGroupId); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); + final Set users = userGroup != null ? userGroup.getUsers().stream() + .map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()) : null; + final Set policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(userGroup.getIdentifier()).stream() + .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); + + final String resourceIdentifier = ResourceFactory.getTenantResource().getIdentifier() + "/" + userGroupId; + final UserGroupDTO snapshot = deleteComponent( + revision, + new Resource() { + @Override + public String getIdentifier() { + return resourceIdentifier; + } + + @Override + public String getName() { + return resourceIdentifier; + } + + @Override + public String getSafeDescription() { + return "User Group " + userGroupId; + } + }, + () -> userGroupDAO.deleteUserGroup(userGroupId), + false, // no user group specific policies to remove + dtoFactory.createUserGroupDto(userGroup, users, policyEntities)); + + return entityFactory.createUserGroupEntity(snapshot, null, permissions); + } + + @Override + public AccessPolicyEntity deleteAccessPolicy(final Revision revision, final String accessPolicyId) { + final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(accessPolicyId); + final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource()); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicyId)); + final Set userGroups = accessPolicy != null ? accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()) : null; + final Set users = accessPolicy != null ? accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()) : null; + final AccessPolicyDTO snapshot = deleteComponent( + revision, + new Resource() { + @Override + public String getIdentifier() { + return accessPolicy.getResource(); + } + + @Override + public String getName() { + return accessPolicy.getResource(); + } + + @Override + public String getSafeDescription() { + return "Policy " + accessPolicyId; + } + }, + () -> accessPolicyDAO.deleteAccessPolicy(accessPolicyId), + false, // no need to clean up any policies as it's already been removed above + dtoFactory.createAccessPolicyDto(accessPolicy, userGroups, users, componentReference)); + + return entityFactory.createAccessPolicyEntity(snapshot, null, permissions); + } + + @Override + public FunnelEntity deleteFunnel(final Revision revision, final String funnelId) { + final Funnel funnel = funnelDAO.getFunnel(funnelId); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnel); + final FunnelDTO snapshot = deleteComponent( + revision, + funnel.getResource(), + () -> funnelDAO.deleteFunnel(funnelId), + true, + dtoFactory.createFunnelDto(funnel)); + + return entityFactory.createFunnelEntity(snapshot, null, permissions); + } + + /** + * Deletes a component using the Optimistic Locking Manager + * + * @param revision the current revision + * @param resource the resource being removed + * @param deleteAction the action that deletes the component via the appropriate DAO object + * @param cleanUpPolicies whether or not the policies for this resource should be removed as well - not necessary when there are + * no component specific policies or if the policies of the component are inherited + * @return a dto that represents the new configuration + */ + private D deleteComponent(final Revision revision, final Resource resource, final Runnable deleteAction, final boolean cleanUpPolicies, final D dto) { + final RevisionClaim claim = new StandardRevisionClaim(revision); + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + return revisionManager.deleteRevision(claim, user, new DeleteRevisionTask() { + @Override + public D performTask() { + logger.debug("Attempting to delete component {} with claim {}", resource.getIdentifier(), claim); + + // run the delete action + deleteAction.run(); + + // save the flow + controllerFacade.save(); + logger.debug("Deletion of component {} was successful", resource.getIdentifier()); + + if (cleanUpPolicies) { + cleanUpPolicies(resource); + } + + return dto; + } + }); + } + + /** + * Clean up the policies for the specified component resource. + * + * @param componentResource the resource for the component + */ + private void cleanUpPolicies(final Resource componentResource) { + // ensure the authorizer supports configuration + if (accessPolicyDAO.supportsConfigurableAuthorizer()) { + final List resources = new ArrayList<>(); + resources.add(componentResource); + resources.add(ResourceFactory.getDataResource(componentResource)); + resources.add(ResourceFactory.getProvenanceDataResource(componentResource)); + resources.add(ResourceFactory.getDataTransferResource(componentResource)); + resources.add(ResourceFactory.getPolicyResource(componentResource)); + + for (final Resource resource : resources) { + for (final RequestAction action : RequestAction.values()) { + try { + // since the component is being deleted, also delete any relevant access policies + final AccessPolicy readPolicy = accessPolicyDAO.getAccessPolicy(action, resource.getIdentifier()); + if (readPolicy != null) { + accessPolicyDAO.deleteAccessPolicy(readPolicy.getIdentifier()); + } + } catch (final Exception e) { + logger.warn(String.format("Unable to remove access policy for %s %s after component removal.", action, resource.getIdentifier()), e); + } + } + } + } + } + + @Override + public void verifyDeleteSnippet(final String snippetId, final Set affectedComponentIds) { + snippetDAO.verifyDeleteSnippetComponents(snippetId); + } + + @Override + public SnippetEntity deleteSnippet(final Set revisions, final String snippetId) { + final Snippet snippet = snippetDAO.getSnippet(snippetId); + + // grab the resources in the snippet so we can delete the policies afterwards + final Set snippetResources = new HashSet<>(); + snippet.getProcessors().keySet().forEach(id -> snippetResources.add(processorDAO.getProcessor(id).getResource())); + snippet.getInputPorts().keySet().forEach(id -> snippetResources.add(inputPortDAO.getPort(id).getResource())); + snippet.getOutputPorts().keySet().forEach(id -> snippetResources.add(outputPortDAO.getPort(id).getResource())); + snippet.getFunnels().keySet().forEach(id -> snippetResources.add(funnelDAO.getFunnel(id).getResource())); + snippet.getLabels().keySet().forEach(id -> snippetResources.add(labelDAO.getLabel(id).getResource())); + snippet.getRemoteProcessGroups().keySet().forEach(id -> snippetResources.add(remoteProcessGroupDAO.getRemoteProcessGroup(id).getResource())); + snippet.getProcessGroups().keySet().forEach(id -> { + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(id); + + // add the process group + snippetResources.add(processGroup.getResource()); + + // add each encapsulated component + processGroup.findAllProcessors().forEach(processor -> snippetResources.add(processor.getResource())); + processGroup.findAllInputPorts().forEach(inputPort -> snippetResources.add(inputPort.getResource())); + processGroup.findAllOutputPorts().forEach(outputPort -> snippetResources.add(outputPort.getResource())); + processGroup.findAllFunnels().forEach(funnel -> snippetResources.add(funnel.getResource())); + processGroup.findAllLabels().forEach(label -> snippetResources.add(label.getResource())); + processGroup.findAllProcessGroups().forEach(childGroup -> snippetResources.add(childGroup.getResource())); + processGroup.findAllRemoteProcessGroups().forEach(remoteProcessGroup -> snippetResources.add(remoteProcessGroup.getResource())); + processGroup.findAllTemplates().forEach(template -> snippetResources.add(template.getResource())); + processGroup.findAllControllerServices().forEach(controllerService -> snippetResources.add(controllerService.getResource())); + }); + + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final RevisionClaim claim = new StandardRevisionClaim(revisions); + final SnippetDTO dto = revisionManager.deleteRevision(claim, user, new DeleteRevisionTask() { + @Override + public SnippetDTO performTask() { + // delete the components in the snippet + snippetDAO.deleteSnippetComponents(snippetId); + + // drop the snippet + snippetDAO.dropSnippet(snippetId); + + // save + controllerFacade.save(); + + // create the dto for the snippet that was just removed + return dtoFactory.createSnippetDto(snippet); + } + }); + + // clean up component policies + snippetResources.forEach(resource -> cleanUpPolicies(resource)); + + return entityFactory.createSnippetEntity(dto); + } + + @Override + public PortEntity deleteInputPort(final Revision revision, final String inputPortId) { + final Port port = inputPortDAO.getPort(inputPortId); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port)); + final PortDTO snapshot = deleteComponent( + revision, + port.getResource(), + () -> inputPortDAO.deletePort(inputPortId), + true, + dtoFactory.createPortDto(port)); + + return entityFactory.createPortEntity(snapshot, null, permissions, operatePermissions, null, null); + } + + @Override + public PortEntity deleteOutputPort(final Revision revision, final String outputPortId) { + final Port port = outputPortDAO.getPort(outputPortId); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port)); + final PortDTO snapshot = deleteComponent( + revision, + port.getResource(), + () -> outputPortDAO.deletePort(outputPortId), + true, + dtoFactory.createPortDto(port)); + + return entityFactory.createPortEntity(snapshot, null, permissions, operatePermissions, null, null); + } + + @Override + public ProcessGroupEntity deleteProcessGroup(final Revision revision, final String groupId) { + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); + + // grab the resources in the snippet so we can delete the policies afterwards + final Set groupResources = new HashSet<>(); + processGroup.findAllProcessors().forEach(processor -> groupResources.add(processor.getResource())); + processGroup.findAllInputPorts().forEach(inputPort -> groupResources.add(inputPort.getResource())); + processGroup.findAllOutputPorts().forEach(outputPort -> groupResources.add(outputPort.getResource())); + processGroup.findAllFunnels().forEach(funnel -> groupResources.add(funnel.getResource())); + processGroup.findAllLabels().forEach(label -> groupResources.add(label.getResource())); + processGroup.findAllProcessGroups().forEach(childGroup -> groupResources.add(childGroup.getResource())); + processGroup.findAllRemoteProcessGroups().forEach(remoteProcessGroup -> groupResources.add(remoteProcessGroup.getResource())); + processGroup.findAllTemplates().forEach(template -> groupResources.add(template.getResource())); + processGroup.findAllControllerServices().forEach(controllerService -> groupResources.add(controllerService.getResource())); + + final ProcessGroupDTO snapshot = deleteComponent( + revision, + processGroup.getResource(), + () -> processGroupDAO.deleteProcessGroup(groupId), + true, + dtoFactory.createProcessGroupDto(processGroup)); + + // delete all applicable component policies + groupResources.forEach(groupResource -> cleanUpPolicies(groupResource)); + + return entityFactory.createProcessGroupEntity(snapshot, null, permissions, null, null); + } + + @Override + public RemoteProcessGroupEntity deleteRemoteProcessGroup(final Revision revision, final String remoteProcessGroupId) { + final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroup)); + final RemoteProcessGroupDTO snapshot = deleteComponent( + revision, + remoteProcessGroup.getResource(), + () -> remoteProcessGroupDAO.deleteRemoteProcessGroup(remoteProcessGroupId), + true, + dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup)); + + return entityFactory.createRemoteProcessGroupEntity(snapshot, null, permissions, operatePermissions, null, null); + } + + @Override + public void deleteTemplate(final String id) { + // delete the template and save the flow + templateDAO.deleteTemplate(id); + controllerFacade.save(); + } + + @Override + public ConnectionEntity createConnection(final Revision revision, final String groupId, final ConnectionDTO connectionDTO) { + final RevisionUpdate snapshot = createComponent( + revision, + connectionDTO, + () -> connectionDAO.createConnection(groupId, connectionDTO), + connection -> dtoFactory.createConnectionDto(connection)); + + final Connection connection = connectionDAO.getConnection(connectionDTO.getId()); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection); + final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionDTO.getId())); + return entityFactory.createConnectionEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status); + } + + @Override + public DropRequestDTO createFlowFileDropRequest(final String connectionId, final String dropRequestId) { + return dtoFactory.createDropRequestDTO(connectionDAO.createFlowFileDropRequest(connectionId, dropRequestId)); + } + + @Override + public ListingRequestDTO createFlowFileListingRequest(final String connectionId, final String listingRequestId) { + final Connection connection = connectionDAO.getConnection(connectionId); + final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.createFlowFileListingRequest(connectionId, listingRequestId)); + + // include whether the source and destination are running + if (connection.getSource() != null) { + listRequest.setSourceRunning(connection.getSource().isRunning()); + } + if (connection.getDestination() != null) { + listRequest.setDestinationRunning(connection.getDestination().isRunning()); + } + + return listRequest; + } + + @Override + public ProcessorEntity createProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) { + final RevisionUpdate snapshot = createComponent( + revision, + processorDTO, + () -> processorDAO.createProcessor(groupId, processorDTO), + processor -> { + awaitValidationCompletion(processor); + return dtoFactory.createProcessorDto(processor); + }); + + final ProcessorNode processor = processorDAO.getProcessor(processorDTO.getId()); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor)); + final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processorDTO.getId())); + final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processorDTO.getId())); + final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); + return entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); + } + + @Override + public LabelEntity createLabel(final Revision revision, final String groupId, final LabelDTO labelDTO) { + final RevisionUpdate snapshot = createComponent( + revision, + labelDTO, + () -> labelDAO.createLabel(groupId, labelDTO), + label -> dtoFactory.createLabelDto(label)); + + final Label label = labelDAO.getLabel(labelDTO.getId()); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label); + return entityFactory.createLabelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); + } + + /** + * Creates a component using the optimistic locking manager. + * + * @param componentDto the DTO that will be used to create the component + * @param daoCreation A Supplier that will create the NiFi Component to use + * @param dtoCreation a Function that will convert the NiFi Component into a corresponding DTO + * @param the DTO Type + * @param the NiFi Component Type + * @return a RevisionUpdate that represents the updated configuration + */ + private RevisionUpdate createComponent(final Revision revision, final ComponentDTO componentDto, final Supplier daoCreation, final Function dtoCreation) { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + // read lock on the containing group + // request claim for component to be created... revision already verified (version == 0) + final RevisionClaim claim = new StandardRevisionClaim(revision); + + // update revision through revision manager + return revisionManager.updateRevision(claim, user, () -> { + // add the component + final C component = daoCreation.get(); + + // save the flow + controllerFacade.save(); + + final D dto = dtoCreation.apply(component); + final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); + return new StandardRevisionUpdate<>(dto, lastMod); + }); + } + + @Override + public BulletinEntity createBulletin(final BulletinDTO bulletinDTO, final Boolean canRead){ + final Bulletin bulletin = BulletinFactory.createBulletin(bulletinDTO.getCategory(),bulletinDTO.getLevel(),bulletinDTO.getMessage()); + bulletinRepository.addBulletin(bulletin); + return entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin),canRead); + } + + @Override + public FunnelEntity createFunnel(final Revision revision, final String groupId, final FunnelDTO funnelDTO) { + final RevisionUpdate snapshot = createComponent( + revision, + funnelDTO, + () -> funnelDAO.createFunnel(groupId, funnelDTO), + funnel -> dtoFactory.createFunnelDto(funnel)); + + final Funnel funnel = funnelDAO.getFunnel(funnelDTO.getId()); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnel); + return entityFactory.createFunnelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); + } + + @Override + public AccessPolicyEntity createAccessPolicy(final Revision revision, final AccessPolicyDTO accessPolicyDTO) { + final Authorizable tenantAuthorizable = authorizableLookup.getTenant(); + final String creator = NiFiUserUtils.getNiFiUserIdentity(); + + final AccessPolicy newAccessPolicy = accessPolicyDAO.createAccessPolicy(accessPolicyDTO); + final ComponentReferenceEntity componentReference = createComponentReferenceEntity(newAccessPolicy.getResource()); + final AccessPolicyDTO newAccessPolicyDto = dtoFactory.createAccessPolicyDto(newAccessPolicy, + newAccessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()), + newAccessPolicy.getUsers().stream().map(userId -> { + final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userId)); + return entityFactory.createTenantEntity(dtoFactory.createTenantDTO(userDAO.getUser(userId)), userRevision, + dtoFactory.createPermissionsDto(tenantAuthorizable)); + }).collect(Collectors.toSet()), componentReference); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicyDTO.getId())); + return entityFactory.createAccessPolicyEntity(newAccessPolicyDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions); + } + + @Override + public UserEntity createUser(final Revision revision, final UserDTO userDTO) { + final String creator = NiFiUserUtils.getNiFiUserIdentity(); + final User newUser = userDAO.createUser(userDTO); + final Set tenantEntities = userGroupDAO.getUserGroupsForUser(newUser.getIdentifier()).stream() + .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()); + final Set policyEntities = userGroupDAO.getAccessPoliciesForUser(newUser.getIdentifier()).stream() + .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); + final UserDTO newUserDto = dtoFactory.createUserDto(newUser, tenantEntities, policyEntities); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); + return entityFactory.createUserEntity(newUserDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions); + } + + private ComponentReferenceEntity createComponentReferenceEntity(final String resource) { + ComponentReferenceEntity componentReferenceEntity = null; + try { + // get the component authorizable + Authorizable componentAuthorizable = authorizableLookup.getAuthorizableFromResource(resource); + + // if this represents an authorizable whose policy permissions are enforced through the base resource, + // get the underlying base authorizable for the component reference + if (componentAuthorizable instanceof EnforcePolicyPermissionsThroughBaseResource) { + componentAuthorizable = ((EnforcePolicyPermissionsThroughBaseResource) componentAuthorizable).getBaseAuthorizable(); + } + + final ComponentReferenceDTO componentReference = dtoFactory.createComponentReferenceDto(componentAuthorizable); + if (componentReference != null) { + final PermissionsDTO componentReferencePermissions = dtoFactory.createPermissionsDto(componentAuthorizable); + final RevisionDTO componentReferenceRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(componentReference.getId())); + componentReferenceEntity = entityFactory.createComponentReferenceEntity(componentReference, componentReferenceRevision, componentReferencePermissions); + } + } catch (final ResourceNotFoundException e) { + // component not found for the specified resource + } + + return componentReferenceEntity; + } + + private AccessPolicySummaryEntity createAccessPolicySummaryEntity(final AccessPolicy ap) { + final ComponentReferenceEntity componentReference = createComponentReferenceEntity(ap.getResource()); + final AccessPolicySummaryDTO apSummary = dtoFactory.createAccessPolicySummaryDto(ap, componentReference); + final PermissionsDTO apPermissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(ap.getIdentifier())); + final RevisionDTO apRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(ap.getIdentifier())); + return entityFactory.createAccessPolicySummaryEntity(apSummary, apRevision, apPermissions); + } + + @Override + public UserGroupEntity createUserGroup(final Revision revision, final UserGroupDTO userGroupDTO) { + final String creator = NiFiUserUtils.getNiFiUserIdentity(); + final Group newUserGroup = userGroupDAO.createUserGroup(userGroupDTO); + final Set tenantEntities = newUserGroup.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()); + final Set policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(newUserGroup.getIdentifier()).stream() + .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); + final UserGroupDTO newUserGroupDto = dtoFactory.createUserGroupDto(newUserGroup, tenantEntities, policyEntities); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); + return entityFactory.createUserGroupEntity(newUserGroupDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions); + } + + private void validateSnippetContents(final FlowSnippetDTO flow) { + // validate any processors + if (flow.getProcessors() != null) { + for (final ProcessorDTO processorDTO : flow.getProcessors()) { + final ProcessorNode processorNode = processorDAO.getProcessor(processorDTO.getId()); + processorDTO.setValidationStatus(processorNode.getValidationStatus().name()); + + final Collection validationErrors = processorNode.getValidationErrors(); + if (validationErrors != null && !validationErrors.isEmpty()) { + final List errors = new ArrayList<>(); + for (final ValidationResult validationResult : validationErrors) { + errors.add(validationResult.toString()); + } + processorDTO.setValidationErrors(errors); + } + } + } + + if (flow.getInputPorts() != null) { + for (final PortDTO portDTO : flow.getInputPorts()) { + final Port port = inputPortDAO.getPort(portDTO.getId()); + final Collection validationErrors = port.getValidationErrors(); + if (validationErrors != null && !validationErrors.isEmpty()) { + final List errors = new ArrayList<>(); + for (final ValidationResult validationResult : validationErrors) { + errors.add(validationResult.toString()); + } + portDTO.setValidationErrors(errors); + } + } + } + + if (flow.getOutputPorts() != null) { + for (final PortDTO portDTO : flow.getOutputPorts()) { + final Port port = outputPortDAO.getPort(portDTO.getId()); + final Collection validationErrors = port.getValidationErrors(); + if (validationErrors != null && !validationErrors.isEmpty()) { + final List errors = new ArrayList<>(); + for (final ValidationResult validationResult : validationErrors) { + errors.add(validationResult.toString()); + } + portDTO.setValidationErrors(errors); + } + } + } + + // get any remote process group issues + if (flow.getRemoteProcessGroups() != null) { + for (final RemoteProcessGroupDTO remoteProcessGroupDTO : flow.getRemoteProcessGroups()) { + final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId()); + + if (remoteProcessGroup.getAuthorizationIssue() != null) { + remoteProcessGroupDTO.setAuthorizationIssues(Arrays.asList(remoteProcessGroup.getAuthorizationIssue())); + } + } + } + } + + @Override + public FlowEntity copySnippet(final String groupId, final String snippetId, final Double originX, final Double originY, final String idGenerationSeed) { + // create the new snippet + final FlowSnippetDTO snippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY, idGenerationSeed); + + // save the flow + controllerFacade.save(); + + // drop the snippet + snippetDAO.dropSnippet(snippetId); + + // post process new flow snippet + final FlowDTO flowDto = postProcessNewFlowSnippet(groupId, snippet); + + final FlowEntity flowEntity = new FlowEntity(); + flowEntity.setFlow(flowDto); + return flowEntity; + } + + @Override + public SnippetEntity createSnippet(final SnippetDTO snippetDTO) { + // add the component + final Snippet snippet = snippetDAO.createSnippet(snippetDTO); + + // save the flow + controllerFacade.save(); + + final SnippetDTO dto = dtoFactory.createSnippetDto(snippet); + final RevisionUpdate snapshot = new StandardRevisionUpdate<>(dto, null); + + return entityFactory.createSnippetEntity(snapshot.getComponent()); + } + + @Override + public PortEntity createInputPort(final Revision revision, final String groupId, final PortDTO inputPortDTO) { + final RevisionUpdate snapshot = createComponent( + revision, + inputPortDTO, + () -> inputPortDAO.createPort(groupId, inputPortDTO), + port -> dtoFactory.createPortDto(port)); + + final Port port = inputPortDAO.getPort(inputPortDTO.getId()); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port)); + final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(port.getIdentifier())); + final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier())); + final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); + return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); + } + + @Override + public PortEntity createOutputPort(final Revision revision, final String groupId, final PortDTO outputPortDTO) { + final RevisionUpdate snapshot = createComponent( + revision, + outputPortDTO, + () -> outputPortDAO.createPort(groupId, outputPortDTO), + port -> dtoFactory.createPortDto(port)); + + final Port port = outputPortDAO.getPort(outputPortDTO.getId()); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port)); + final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(port.getIdentifier())); + final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier())); + final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); + return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); + } + + @Override + public ProcessGroupEntity createProcessGroup(final Revision revision, final String parentGroupId, final ProcessGroupDTO processGroupDTO) { + final RevisionUpdate snapshot = createComponent( + revision, + processGroupDTO, + () -> processGroupDAO.createProcessGroup(parentGroupId, processGroupDTO), + processGroup -> dtoFactory.createProcessGroupDto(processGroup)); + + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupDTO.getId()); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); + final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroup.getIdentifier())); + final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroup.getIdentifier())); + final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); + return entityFactory.createProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status, bulletinEntities); + } + + @Override + public RemoteProcessGroupEntity createRemoteProcessGroup(final Revision revision, final String groupId, final RemoteProcessGroupDTO remoteProcessGroupDTO) { + final RevisionUpdate snapshot = createComponent( + revision, + remoteProcessGroupDTO, + () -> remoteProcessGroupDAO.createRemoteProcessGroup(groupId, remoteProcessGroupDTO), + remoteProcessGroup -> dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup)); + + final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId()); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroup)); + final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroup, controllerFacade.getRemoteProcessGroupStatus(remoteProcessGroup.getIdentifier())); + final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(remoteProcessGroup.getIdentifier())); + final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); + return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), + permissions, operatePermissions, status, bulletinEntities); + } + + @Override + public boolean isRemoteGroupPortConnected(final String remoteProcessGroupId, final String remotePortId) { + final RemoteProcessGroup rpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId); + RemoteGroupPort port = rpg.getInputPort(remotePortId); + if (port != null) { + return port.hasIncomingConnection(); + } + + port = rpg.getOutputPort(remotePortId); + if (port != null) { + return !port.getConnections().isEmpty(); + } + + throw new ResourceNotFoundException("Could not find Port with ID " + remotePortId + " as a child of RemoteProcessGroup with ID " + remoteProcessGroupId); + } + + @Override + public void verifyCanAddTemplate(String groupId, String name) { + templateDAO.verifyCanAddTemplate(name, groupId); + } + + @Override + public void verifyComponentTypes(FlowSnippetDTO snippet) { + templateDAO.verifyComponentTypes(snippet); + } + + @Override + public void verifyComponentTypes(final VersionedProcessGroup versionedGroup) { + controllerFacade.verifyComponentTypes(versionedGroup); + } + + @Override + public void verifyImportProcessGroup(final VersionControlInformationDTO versionControlInfo, final VersionedProcessGroup contents, final String groupId) { + final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); + verifyImportProcessGroup(versionControlInfo, contents, group); + } + + private void verifyImportProcessGroup(final VersionControlInformationDTO vciDto, final VersionedProcessGroup contents, final ProcessGroup group) { + if (group == null) { + return; + } + + final VersionControlInformation vci = group.getVersionControlInformation(); + if (vci != null) { + // Note that we do not compare the Registry ID here because there could be two registry clients + // that point to the same server (one could point to localhost while another points to 127.0.0.1, for instance).. + if (Objects.equals(vciDto.getBucketId(), vci.getBucketIdentifier()) + && Objects.equals(vciDto.getFlowId(), vci.getFlowIdentifier())) { + + throw new IllegalStateException("Cannot import the specified Versioned Flow into the Process Group because doing so would cause a recursive dataflow. " + + "If Process Group A contains Process Group B, then Process Group B is not allowed to contain the flow identified by Process Group A."); + } + } + + final Set childGroups = contents.getProcessGroups(); + if (childGroups != null) { + for (final VersionedProcessGroup childGroup : childGroups) { + final VersionedFlowCoordinates childCoordinates = childGroup.getVersionedFlowCoordinates(); + if (childCoordinates != null) { + final VersionControlInformationDTO childVci = new VersionControlInformationDTO(); + childVci.setBucketId(childCoordinates.getBucketId()); + childVci.setFlowId(childCoordinates.getFlowId()); + verifyImportProcessGroup(childVci, childGroup, group); + } + } + } + + verifyImportProcessGroup(vciDto, contents, group.getParent()); + } + + @Override + public TemplateDTO createTemplate(final String name, final String description, final String snippetId, final String groupId, final Optional idGenerationSeed) { + // get the specified snippet + final Snippet snippet = snippetDAO.getSnippet(snippetId); + + // create the template + final TemplateDTO templateDTO = new TemplateDTO(); + templateDTO.setName(name); + templateDTO.setDescription(description); + templateDTO.setTimestamp(new Date()); + templateDTO.setSnippet(snippetUtils.populateFlowSnippet(snippet, true, true, true)); + templateDTO.setEncodingVersion(TemplateDTO.MAX_ENCODING_VERSION); + + // set the id based on the specified seed + final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString(); + templateDTO.setId(uuid); + + // create the template + final Template template = templateDAO.createTemplate(templateDTO, groupId); + + // drop the snippet + snippetDAO.dropSnippet(snippetId); + + // save the flow + controllerFacade.save(); + + return dtoFactory.createTemplateDTO(template); + } + + /** + * Ensures default values are populated for all components in this snippet. This is necessary to handle old templates without default values + * and when existing properties have default values introduced. + * + * @param snippet snippet + */ + private void ensureDefaultPropertyValuesArePopulated(final FlowSnippetDTO snippet) { + if (snippet != null) { + if (snippet.getControllerServices() != null) { + snippet.getControllerServices().forEach(dto -> { + if (dto.getProperties() == null) { + dto.setProperties(new LinkedHashMap<>()); + } + + try { + final ConfigurableComponent configurableComponent = controllerFacade.getTemporaryComponent(dto.getType(), dto.getBundle()); + configurableComponent.getPropertyDescriptors().forEach(descriptor -> { + if (dto.getProperties().get(descriptor.getName()) == null) { + dto.getProperties().put(descriptor.getName(), descriptor.getDefaultValue()); + } + }); + } catch (final Exception e) { + logger.warn(String.format("Unable to create ControllerService of type %s to populate default values.", dto.getType())); + } + }); + } + + if (snippet.getProcessors() != null) { + snippet.getProcessors().forEach(dto -> { + if (dto.getConfig() == null) { + dto.setConfig(new ProcessorConfigDTO()); + } + + final ProcessorConfigDTO config = dto.getConfig(); + if (config.getProperties() == null) { + config.setProperties(new LinkedHashMap<>()); + } + + try { + final ConfigurableComponent configurableComponent = controllerFacade.getTemporaryComponent(dto.getType(), dto.getBundle()); + configurableComponent.getPropertyDescriptors().forEach(descriptor -> { + if (config.getProperties().get(descriptor.getName()) == null) { + config.getProperties().put(descriptor.getName(), descriptor.getDefaultValue()); + } + }); + } catch (final Exception e) { + logger.warn(String.format("Unable to create Processor of type %s to populate default values.", dto.getType())); + } + }); + } + + if (snippet.getProcessGroups() != null) { + snippet.getProcessGroups().forEach(processGroup -> { + ensureDefaultPropertyValuesArePopulated(processGroup.getContents()); + }); + } + } + } + + @Override + public TemplateDTO importTemplate(final TemplateDTO templateDTO, final String groupId, final Optional idGenerationSeed) { + // ensure id is set + final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString(); + templateDTO.setId(uuid); + + // mark the timestamp + templateDTO.setTimestamp(new Date()); + + // ensure default values are populated + ensureDefaultPropertyValuesArePopulated(templateDTO.getSnippet()); + + // import the template + final Template template = templateDAO.importTemplate(templateDTO, groupId); + + // save the flow + controllerFacade.save(); + + // return the template dto + return dtoFactory.createTemplateDTO(template); + } + + /** + * Post processes a new flow snippet including validation, removing the snippet, and DTO conversion. + * + * @param groupId group id + * @param snippet snippet + * @return flow dto + */ + private FlowDTO postProcessNewFlowSnippet(final String groupId, final FlowSnippetDTO snippet) { + // validate the new snippet + validateSnippetContents(snippet); + + // identify all components added + final Set identifiers = new HashSet<>(); + snippet.getProcessors().stream() + .map(proc -> proc.getId()) + .forEach(id -> identifiers.add(id)); + snippet.getConnections().stream() + .map(conn -> conn.getId()) + .forEach(id -> identifiers.add(id)); + snippet.getInputPorts().stream() + .map(port -> port.getId()) + .forEach(id -> identifiers.add(id)); + snippet.getOutputPorts().stream() + .map(port -> port.getId()) + .forEach(id -> identifiers.add(id)); + snippet.getProcessGroups().stream() + .map(group -> group.getId()) + .forEach(id -> identifiers.add(id)); + snippet.getRemoteProcessGroups().stream() + .map(remoteGroup -> remoteGroup.getId()) + .forEach(id -> identifiers.add(id)); + snippet.getRemoteProcessGroups().stream() + .filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getInputPorts() != null) + .flatMap(remoteGroup -> remoteGroup.getContents().getInputPorts().stream()) + .map(remoteInputPort -> remoteInputPort.getId()) + .forEach(id -> identifiers.add(id)); + snippet.getRemoteProcessGroups().stream() + .filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getOutputPorts() != null) + .flatMap(remoteGroup -> remoteGroup.getContents().getOutputPorts().stream()) + .map(remoteOutputPort -> remoteOutputPort.getId()) + .forEach(id -> identifiers.add(id)); + snippet.getLabels().stream() + .map(label -> label.getId()) + .forEach(id -> identifiers.add(id)); + + final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); + final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId); + return dtoFactory.createFlowDto(group, groupStatus, snippet, revisionManager, this::getProcessGroupBulletins); + } + + @Override + public FlowEntity createTemplateInstance(final String groupId, final Double originX, final Double originY, final String templateEncodingVersion, + final FlowSnippetDTO requestSnippet, final String idGenerationSeed) { + + // instantiate the template - there is no need to make another copy of the flow snippet since the actual template + // was copied and this dto is only used to instantiate it's components (which as already completed) + final FlowSnippetDTO snippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateEncodingVersion, requestSnippet, idGenerationSeed); + + // save the flow + controllerFacade.save(); + + // post process the new flow snippet + final FlowDTO flowDto = postProcessNewFlowSnippet(groupId, snippet); + + final FlowEntity flowEntity = new FlowEntity(); + flowEntity.setFlow(flowDto); + return flowEntity; + } + + @Override + public ControllerServiceEntity createControllerService(final Revision revision, final String groupId, final ControllerServiceDTO controllerServiceDTO) { + controllerServiceDTO.setParentGroupId(groupId); + + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + // request claim for component to be created... revision already verified (version == 0) + final RevisionClaim claim = new StandardRevisionClaim(revision); + + final RevisionUpdate snapshot; + if (groupId == null) { + // update revision through revision manager + snapshot = revisionManager.updateRevision(claim, user, () -> { + // Unfortunately, we can not use the createComponent() method here because createComponent() wants to obtain the read lock + // on the group. The Controller Service may or may not have a Process Group (it won't if it's controller-scoped). + final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO); + controllerFacade.save(); + + awaitValidationCompletion(controllerService); + final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(controllerService); + + final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); + return new StandardRevisionUpdate<>(dto, lastMod); + }); + } else { + snapshot = revisionManager.updateRevision(claim, user, () -> { + final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO); + controllerFacade.save(); + + awaitValidationCompletion(controllerService); + final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(controllerService); + + final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); + return new StandardRevisionUpdate<>(dto, lastMod); + }); + } + + final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceDTO.getId()); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService)); + final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(controllerServiceDTO.getId())); + final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); + return entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities); + } + + @Override + public ControllerServiceEntity updateControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) { + // get the component, ensure we have access to it, and perform the update request + final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceDTO.getId()); + final RevisionUpdate snapshot = updateComponent(revision, + controllerService, + () -> controllerServiceDAO.updateControllerService(controllerServiceDTO), + cs -> { + awaitValidationCompletion(cs); + final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(cs); + final ControllerServiceReference ref = controllerService.getReferences(); + final ControllerServiceReferencingComponentsEntity referencingComponentsEntity = + createControllerServiceReferencingComponentsEntity(ref, Sets.newHashSet(controllerService.getIdentifier())); + dto.setReferencingComponents(referencingComponentsEntity.getControllerServiceReferencingComponents()); + return dto; + }); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService)); + final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(controllerServiceDTO.getId())); + final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); + return entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities); + } + + + @Override + public ControllerServiceReferencingComponentsEntity updateControllerServiceReferencingComponents( + final Map referenceRevisions, final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) { + + final RevisionClaim claim = new StandardRevisionClaim(referenceRevisions.values()); + + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final RevisionUpdate update = revisionManager.updateRevision(claim, user, + new UpdateRevisionTask() { + @Override + public RevisionUpdate update() { + final Set updated = controllerServiceDAO.updateControllerServiceReferencingComponents(controllerServiceId, scheduledState, controllerServiceState); + final ControllerServiceReference updatedReference = controllerServiceDAO.getControllerService(controllerServiceId).getReferences(); + + // get the revisions of the updated components + final Map updatedRevisions = new HashMap<>(); + for (final ComponentNode component : updated) { + final Revision currentRevision = revisionManager.getRevision(component.getIdentifier()); + final Revision requestRevision = referenceRevisions.get(component.getIdentifier()); + updatedRevisions.put(component.getIdentifier(), currentRevision.incrementRevision(requestRevision.getClientId())); + } + + // ensure the revision for all referencing components is included regardless of whether they were updated in this request + for (final ComponentNode component : updatedReference.findRecursiveReferences(ComponentNode.class)) { + updatedRevisions.putIfAbsent(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier())); + } + + final ControllerServiceReferencingComponentsEntity entity = createControllerServiceReferencingComponentsEntity(updatedReference, updatedRevisions); + return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values())); + } + }); + + return update.getComponent(); + } + + /** + * Finds the identifiers for all components referencing a ControllerService. + * + * @param reference ControllerServiceReference + * @param visited ControllerServices we've already visited + */ + private void findControllerServiceReferencingComponentIdentifiers(final ControllerServiceReference reference, final Set visited) { + for (final ComponentNode component : reference.getReferencingComponents()) { + + // if this is a ControllerService consider it's referencing components + if (component instanceof ControllerServiceNode) { + final ControllerServiceNode node = (ControllerServiceNode) component; + if (!visited.contains(node)) { + visited.add(node); + findControllerServiceReferencingComponentIdentifiers(node.getReferences(), visited); + } + } + } + } + + /** + * Creates entities for components referencing a ControllerService using their current revision. + * + * @param reference ControllerServiceReference + * @return The entity + */ + private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity(final ControllerServiceReference reference, final Set lockedIds) { + final Set visited = new HashSet<>(); + visited.add(reference.getReferencedComponent()); + findControllerServiceReferencingComponentIdentifiers(reference, visited); + + final Map referencingRevisions = new HashMap<>(); + for (final ComponentNode component : reference.getReferencingComponents()) { + referencingRevisions.put(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier())); + } + + return createControllerServiceReferencingComponentsEntity(reference, referencingRevisions); + } + + /** + * Creates entities for components referencing a ControllerService using the specified revisions. + * + * @param reference ControllerServiceReference + * @param revisions The revisions + * @return The entity + */ + private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity( + final ControllerServiceReference reference, final Map revisions) { + final Set visited = new HashSet<>(); + visited.add(reference.getReferencedComponent()); + return createControllerServiceReferencingComponentsEntity(reference, revisions, visited); + } + + /** + * Creates entities for components referencing a ControllerServcie using the specified revisions. + * + * @param reference ControllerServiceReference + * @param revisions The revisions + * @param visited Which services we've already considered (in case of cycle) + * @return The entity + */ + private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity( + final ControllerServiceReference reference, final Map revisions, final Set visited) { + + final String modifier = NiFiUserUtils.getNiFiUserIdentity(); + final Set referencingComponents = reference.getReferencingComponents(); + + final Set componentEntities = new HashSet<>(); + for (final ComponentNode refComponent : referencingComponents) { + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(refComponent); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(refComponent)); + + final Revision revision = revisions.get(refComponent.getIdentifier()); + final FlowModification flowMod = new FlowModification(revision, modifier); + final RevisionDTO revisionDto = dtoFactory.createRevisionDTO(flowMod); + final ControllerServiceReferencingComponentDTO dto = dtoFactory.createControllerServiceReferencingComponentDTO(refComponent); + + if (refComponent instanceof ControllerServiceNode) { + final ControllerServiceNode node = (ControllerServiceNode) refComponent; + + // indicate if we've hit a cycle + dto.setReferenceCycle(visited.contains(node)); + + // mark node as visited before building the reference cycle + visited.add(node); + + // if we haven't encountered this service before include it's referencing components + if (!dto.getReferenceCycle()) { + final ControllerServiceReference refReferences = node.getReferences(); + final Map referencingRevisions = new HashMap<>(revisions); + for (final ComponentNode component : refReferences.getReferencingComponents()) { + referencingRevisions.putIfAbsent(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier())); + } + final ControllerServiceReferencingComponentsEntity references = createControllerServiceReferencingComponentsEntity(refReferences, referencingRevisions, visited); + dto.setReferencingComponents(references.getControllerServiceReferencingComponents()); + } + } + + componentEntities.add(entityFactory.createControllerServiceReferencingComponentEntity(refComponent.getIdentifier(), dto, revisionDto, permissions, operatePermissions)); + } + + final ControllerServiceReferencingComponentsEntity entity = new ControllerServiceReferencingComponentsEntity(); + entity.setControllerServiceReferencingComponents(componentEntities); + return entity; + } + + @Override + public ControllerServiceEntity deleteControllerService(final Revision revision, final String controllerServiceId) { + final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService)); + final ControllerServiceDTO snapshot = deleteComponent( + revision, + controllerService.getResource(), + () -> controllerServiceDAO.deleteControllerService(controllerServiceId), + true, + dtoFactory.createControllerServiceDto(controllerService)); + + return entityFactory.createControllerServiceEntity(snapshot, null, permissions, operatePermissions, null); + } + + + @Override + public RegistryClientEntity createRegistryClient(Revision revision, RegistryDTO registryDTO) { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + // request claim for component to be created... revision already verified (version == 0) + final RevisionClaim claim = new StandardRevisionClaim(revision); + + // update revision through revision manager + final RevisionUpdate revisionUpdate = revisionManager.updateRevision(claim, user, () -> { + // add the component + final FlowRegistry registry = registryDAO.createFlowRegistry(registryDTO); + + // save the flow + controllerFacade.save(); + + final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); + return new StandardRevisionUpdate<>(registry, lastMod); + }); + + final FlowRegistry registry = revisionUpdate.getComponent(); + return createRegistryClientEntity(registry); + } + + @Override + public RegistryClientEntity getRegistryClient(final String registryId) { + final FlowRegistry registry = registryDAO.getFlowRegistry(registryId); + return createRegistryClientEntity(registry); + } + + private RegistryClientEntity createRegistryClientEntity(final FlowRegistry flowRegistry) { + if (flowRegistry == null) { + return null; + } + + final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(flowRegistry.getIdentifier())); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getController()); + final RegistryDTO dto = dtoFactory.createRegistryDto(flowRegistry); + + return entityFactory.createRegistryClientEntity(dto, revision, permissions); + } + + private VersionedFlowEntity createVersionedFlowEntity(final String registryId, final VersionedFlow versionedFlow) { + if (versionedFlow == null) { + return null; + } + + final VersionedFlowDTO dto = new VersionedFlowDTO(); + dto.setRegistryId(registryId); + dto.setBucketId(versionedFlow.getBucketIdentifier()); + dto.setFlowId(versionedFlow.getIdentifier()); + dto.setFlowName(versionedFlow.getName()); + dto.setDescription(versionedFlow.getDescription()); + + final VersionedFlowEntity entity = new VersionedFlowEntity(); + entity.setVersionedFlow(dto); + + return entity; + } + + private VersionedFlowSnapshotMetadataEntity createVersionedFlowSnapshotMetadataEntity(final String registryId, final VersionedFlowSnapshotMetadata metadata) { + if (metadata == null) { + return null; + } + + final VersionedFlowSnapshotMetadataEntity entity = new VersionedFlowSnapshotMetadataEntity(); + entity.setRegistryId(registryId); + entity.setVersionedFlowMetadata(metadata); + + return entity; + } + + @Override + public Set getRegistryClients() { + return registryDAO.getFlowRegistries().stream() + .map(this::createRegistryClientEntity) + .collect(Collectors.toSet()); + } + + @Override + public Set getRegistriesForUser(final NiFiUser user) { + return registryDAO.getFlowRegistriesForUser(user).stream() + .map(flowRegistry -> entityFactory.createRegistryEntity(dtoFactory.createRegistryDto(flowRegistry))) + .collect(Collectors.toSet()); + } + + @Override + public Set getBucketsForUser(final String registryId, final NiFiUser user) { + return registryDAO.getBucketsForUser(registryId, user).stream() + .map(bucket -> { + if (bucket == null) { + return null; + } + + final BucketDTO dto = new BucketDTO(); + dto.setId(bucket.getIdentifier()); + dto.setName(bucket.getName()); + dto.setDescription(bucket.getDescription()); + dto.setCreated(bucket.getCreatedTimestamp()); + + final Permissions regPermissions = bucket.getPermissions(); + final PermissionsDTO permissions = new PermissionsDTO(); + permissions.setCanRead(regPermissions.getCanRead()); + permissions.setCanWrite(regPermissions.getCanWrite()); + + return entityFactory.createBucketEntity(dto, permissions); + }) + .collect(Collectors.toSet()); + } + + @Override + public Set getFlowsForUser(String registryId, String bucketId, NiFiUser user) { + return registryDAO.getFlowsForUser(registryId, bucketId, user).stream() + .map(vf -> createVersionedFlowEntity(registryId, vf)) + .collect(Collectors.toSet()); + } + + @Override + public Set getFlowVersionsForUser(String registryId, String bucketId, String flowId, NiFiUser user) { + return registryDAO.getFlowVersionsForUser(registryId, bucketId, flowId, user).stream() + .map(md -> createVersionedFlowSnapshotMetadataEntity(registryId, md)) + .collect(Collectors.toSet()); + } + + @Override + public RegistryClientEntity updateRegistryClient(Revision revision, RegistryDTO registryDTO) { + final RevisionClaim revisionClaim = new StandardRevisionClaim(revision); + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + final FlowRegistry registry = registryDAO.getFlowRegistry(registryDTO.getId()); + final RevisionUpdate revisionUpdate = revisionManager.updateRevision(revisionClaim, user, () -> { + final boolean duplicateName = registryDAO.getFlowRegistries().stream() + .anyMatch(reg -> reg.getName().equals(registryDTO.getName()) && !reg.getIdentifier().equals(registryDTO.getId())); + + if (duplicateName) { + throw new IllegalStateException("Cannot update Flow Registry because a Flow Registry already exists with the name " + registryDTO.getName()); + } + + registry.setDescription(registryDTO.getDescription()); + registry.setName(registryDTO.getName()); + registry.setURL(registryDTO.getUri()); + + controllerFacade.save(); + + final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId()); + final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity()); + + return new StandardRevisionUpdate<>(registry, lastModification); + }); + + final FlowRegistry updatedReg = revisionUpdate.getComponent(); + return createRegistryClientEntity(updatedReg); + } + + @Override + public void verifyDeleteRegistry(String registryId) { + processGroupDAO.verifyDeleteFlowRegistry(registryId); + } + + @Override + public RegistryClientEntity deleteRegistryClient(final Revision revision, final String registryId) { + final RevisionClaim claim = new StandardRevisionClaim(revision); + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + final FlowRegistry registry = revisionManager.deleteRevision(claim, user, () -> { + final FlowRegistry reg = registryDAO.removeFlowRegistry(registryId); + controllerFacade.save(); + return reg; + }); + + return createRegistryClientEntity(registry); + } + + @Override + public ReportingTaskEntity createReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + // request claim for component to be created... revision already verified (version == 0) + final RevisionClaim claim = new StandardRevisionClaim(revision); + + // update revision through revision manager + final RevisionUpdate snapshot = revisionManager.updateRevision(claim, user, () -> { + // create the reporting task + final ReportingTaskNode reportingTask = reportingTaskDAO.createReportingTask(reportingTaskDTO); + + // save the update + controllerFacade.save(); + awaitValidationCompletion(reportingTask); + + final ReportingTaskDTO dto = dtoFactory.createReportingTaskDto(reportingTask); + final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); + return new StandardRevisionUpdate<>(dto, lastMod); + }); + + final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskDTO.getId()); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask)); + final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier())); + final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); + return entityFactory.createReportingTaskEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities); + } + + @Override + public ReportingTaskEntity updateReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) { + // get the component, ensure we have access to it, and perform the update request + final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskDTO.getId()); + final RevisionUpdate snapshot = updateComponent(revision, + reportingTask, + () -> reportingTaskDAO.updateReportingTask(reportingTaskDTO), + rt -> { + awaitValidationCompletion(rt); + return dtoFactory.createReportingTaskDto(rt); + }); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask)); + final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier())); + final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); + return entityFactory.createReportingTaskEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities); + } + + @Override + public ReportingTaskEntity deleteReportingTask(final Revision revision, final String reportingTaskId) { + final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask)); + final ReportingTaskDTO snapshot = deleteComponent( + revision, + reportingTask.getResource(), + () -> reportingTaskDAO.deleteReportingTask(reportingTaskId), + true, + dtoFactory.createReportingTaskDto(reportingTask)); + + return entityFactory.createReportingTaskEntity(snapshot, null, permissions, operatePermissions, null); + } + + @Override + public void deleteActions(final Date endDate) { + // get the user from the request + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + if (user == null) { + throw new WebApplicationException(new Throwable("Unable to access details for current user.")); + } + + // create the purge details + final FlowChangePurgeDetails details = new FlowChangePurgeDetails(); + details.setEndDate(endDate); + + // create a purge action to record that records are being removed + final FlowChangeAction purgeAction = new FlowChangeAction(); + purgeAction.setUserIdentity(user.getIdentity()); + purgeAction.setOperation(Operation.Purge); + purgeAction.setTimestamp(new Date()); + purgeAction.setSourceId("Flow Controller"); + purgeAction.setSourceName("History"); + purgeAction.setSourceType(Component.Controller); + purgeAction.setActionDetails(details); + + // purge corresponding actions + auditService.purgeActions(endDate, purgeAction); + } + + @Override + public ProvenanceDTO submitProvenance(final ProvenanceDTO query) { + return controllerFacade.submitProvenance(query); + } + + @Override + public void deleteProvenance(final String queryId) { + controllerFacade.deleteProvenanceQuery(queryId); + } + + @Override + public LineageDTO submitLineage(final LineageDTO lineage) { + return controllerFacade.submitLineage(lineage); + } + + @Override + public void deleteLineage(final String lineageId) { + controllerFacade.deleteLineage(lineageId); + } + + @Override + public ProvenanceEventDTO submitReplay(final Long eventId) { + return controllerFacade.submitReplay(eventId); + } + + // ----------------------------------------- + // Read Operations + // ----------------------------------------- + + @Override + public SearchResultsDTO searchController(final String query) { + return controllerFacade.search(query); + } + + @Override + public DownloadableContent getContent(final String connectionId, final String flowFileUuid, final String uri) { + return connectionDAO.getContent(connectionId, flowFileUuid, uri); + } + + @Override + public DownloadableContent getContent(final Long eventId, final String uri, final ContentDirection contentDirection) { + return controllerFacade.getContent(eventId, uri, contentDirection); + } + + @Override + public ProvenanceDTO getProvenance(final String queryId, final Boolean summarize, final Boolean incrementalResults) { + return controllerFacade.getProvenanceQuery(queryId, summarize, incrementalResults); + } + + @Override + public LineageDTO getLineage(final String lineageId) { + return controllerFacade.getLineage(lineageId); + } + + @Override + public ProvenanceOptionsDTO getProvenanceSearchOptions() { + return controllerFacade.getProvenanceSearchOptions(); + } + + @Override + public ProvenanceEventDTO getProvenanceEvent(final Long id) { + return controllerFacade.getProvenanceEvent(id); + } + + @Override + public ProcessGroupStatusEntity getProcessGroupStatus(final String groupId, final boolean recursive) { + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); + final ProcessGroupStatusDTO dto = dtoFactory.createProcessGroupStatusDto(processGroup, controllerFacade.getProcessGroupStatus(groupId)); + + // prune the response as necessary + if (!recursive) { + pruneChildGroups(dto.getAggregateSnapshot()); + if (dto.getNodeSnapshots() != null) { + for (final NodeProcessGroupStatusSnapshotDTO nodeSnapshot : dto.getNodeSnapshots()) { + pruneChildGroups(nodeSnapshot.getStatusSnapshot()); + } + } + } + + return entityFactory.createProcessGroupStatusEntity(dto, permissions); + } + + private void pruneChildGroups(final ProcessGroupStatusSnapshotDTO snapshot) { + for (final ProcessGroupStatusSnapshotEntity childProcessGroupStatusEntity : snapshot.getProcessGroupStatusSnapshots()) { + final ProcessGroupStatusSnapshotDTO childProcessGroupStatus = childProcessGroupStatusEntity.getProcessGroupStatusSnapshot(); + childProcessGroupStatus.setConnectionStatusSnapshots(null); + childProcessGroupStatus.setProcessGroupStatusSnapshots(null); + childProcessGroupStatus.setInputPortStatusSnapshots(null); + childProcessGroupStatus.setOutputPortStatusSnapshots(null); + childProcessGroupStatus.setProcessorStatusSnapshots(null); + childProcessGroupStatus.setRemoteProcessGroupStatusSnapshots(null); + } + } + + @Override + public ControllerStatusDTO getControllerStatus() { + return controllerFacade.getControllerStatus(); + } + + @Override + public ComponentStateDTO getProcessorState(final String processorId) { + final StateMap clusterState = isClustered() ? processorDAO.getState(processorId, Scope.CLUSTER) : null; + final StateMap localState = processorDAO.getState(processorId, Scope.LOCAL); + + // processor will be non null as it was already found when getting the state + final ProcessorNode processor = processorDAO.getProcessor(processorId); + return dtoFactory.createComponentStateDTO(processorId, processor.getProcessor().getClass(), localState, clusterState); + } + + @Override + public ComponentStateDTO getControllerServiceState(final String controllerServiceId) { + final StateMap clusterState = isClustered() ? controllerServiceDAO.getState(controllerServiceId, Scope.CLUSTER) : null; + final StateMap localState = controllerServiceDAO.getState(controllerServiceId, Scope.LOCAL); + + // controller service will be non null as it was already found when getting the state + final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId); + return dtoFactory.createComponentStateDTO(controllerServiceId, controllerService.getControllerServiceImplementation().getClass(), localState, clusterState); + } + + @Override + public ComponentStateDTO getReportingTaskState(final String reportingTaskId) { + final StateMap clusterState = isClustered() ? reportingTaskDAO.getState(reportingTaskId, Scope.CLUSTER) : null; + final StateMap localState = reportingTaskDAO.getState(reportingTaskId, Scope.LOCAL); + + // reporting task will be non null as it was already found when getting the state + final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId); + return dtoFactory.createComponentStateDTO(reportingTaskId, reportingTask.getReportingTask().getClass(), localState, clusterState); + } + + @Override + public CountersDTO getCounters() { + final List counters = controllerFacade.getCounters(); + final Set counterDTOs = new LinkedHashSet<>(counters.size()); + for (final Counter counter : counters) { + counterDTOs.add(dtoFactory.createCounterDto(counter)); + } + + final CountersSnapshotDTO snapshotDto = dtoFactory.createCountersDto(counterDTOs); + final CountersDTO countersDto = new CountersDTO(); + countersDto.setAggregateSnapshot(snapshotDto); + + return countersDto; + } + + private ConnectionEntity createConnectionEntity(final Connection connection) { + final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(connection.getIdentifier())); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection); + final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connection.getIdentifier())); + return entityFactory.createConnectionEntity(dtoFactory.createConnectionDto(connection), revision, permissions, status); + } + + @Override + public Set getConnections(final String groupId) { + final Set connections = connectionDAO.getConnections(groupId); + return connections.stream() + .map(connection -> createConnectionEntity(connection)) + .collect(Collectors.toSet()); + } + + @Override + public ConnectionEntity getConnection(final String connectionId) { + final Connection connection = connectionDAO.getConnection(connectionId); + return createConnectionEntity(connection); + } + + @Override + public DropRequestDTO getFlowFileDropRequest(final String connectionId, final String dropRequestId) { + return dtoFactory.createDropRequestDTO(connectionDAO.getFlowFileDropRequest(connectionId, dropRequestId)); + } + + @Override + public ListingRequestDTO getFlowFileListingRequest(final String connectionId, final String listingRequestId) { + final Connection connection = connectionDAO.getConnection(connectionId); + final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.getFlowFileListingRequest(connectionId, listingRequestId)); + + // include whether the source and destination are running + if (connection.getSource() != null) { + listRequest.setSourceRunning(connection.getSource().isRunning()); + } + if (connection.getDestination() != null) { + listRequest.setDestinationRunning(connection.getDestination().isRunning()); + } + + return listRequest; + } + + @Override + public FlowFileDTO getFlowFile(final String connectionId, final String flowFileUuid) { + return dtoFactory.createFlowFileDTO(connectionDAO.getFlowFile(connectionId, flowFileUuid)); + } + + @Override + public ConnectionStatusEntity getConnectionStatus(final String connectionId) { + final Connection connection = connectionDAO.getConnection(connectionId); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection); + final ConnectionStatusDTO dto = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionId)); + return entityFactory.createConnectionStatusEntity(dto, permissions); + } + + @Override + public StatusHistoryEntity getConnectionStatusHistory(final String connectionId) { + final Connection connection = connectionDAO.getConnection(connectionId); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection); + final StatusHistoryDTO dto = controllerFacade.getConnectionStatusHistory(connectionId); + return entityFactory.createStatusHistoryEntity(dto, permissions); + } + + private ProcessorEntity createProcessorEntity(final ProcessorNode processor, final NiFiUser user) { + final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processor.getIdentifier())); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor, user); + final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor)); + final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier())); + final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processor.getIdentifier())); + final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); + return entityFactory.createProcessorEntity(dtoFactory.createProcessorDto(processor), revision, permissions, operatePermissions, status, bulletinEntities); + } + + @Override + public Set getProcessors(final String groupId, final boolean includeDescendants) { + final Set processors = processorDAO.getProcessors(groupId, includeDescendants); + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + return processors.stream() + .map(processor -> createProcessorEntity(processor, user)) + .collect(Collectors.toSet()); + } + + @Override + public TemplateDTO exportTemplate(final String id) { + final Template template = templateDAO.getTemplate(id); + final TemplateDTO templateDetails = template.getDetails(); + + final TemplateDTO templateDTO = dtoFactory.createTemplateDTO(template); + templateDTO.setSnippet(dtoFactory.copySnippetContents(templateDetails.getSnippet())); + return templateDTO; + } + + @Override + public TemplateDTO getTemplate(final String id) { + return dtoFactory.createTemplateDTO(templateDAO.getTemplate(id)); + } + + @Override + public Set getTemplates() { + return templateDAO.getTemplates().stream() + .map(template -> { + final TemplateDTO dto = dtoFactory.createTemplateDTO(template); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(template); + + final TemplateEntity entity = new TemplateEntity(); + entity.setId(dto.getId()); + entity.setPermissions(permissions); + entity.setTemplate(dto); + return entity; + }).collect(Collectors.toSet()); + } + + @Override + public Set getWorkQueuePrioritizerTypes() { + return controllerFacade.getFlowFileComparatorTypes(); + } + + @Override + public Set getProcessorTypes(final String bundleGroup, final String bundleArtifact, final String type) { + return controllerFacade.getFlowFileProcessorTypes(bundleGroup, bundleArtifact, type); + } + + @Override + public Set getControllerServiceTypes(final String serviceType, final String serviceBundleGroup, final String serviceBundleArtifact, final String serviceBundleVersion, + final String bundleGroup, final String bundleArtifact, final String type) { + return controllerFacade.getControllerServiceTypes(serviceType, serviceBundleGroup, serviceBundleArtifact, serviceBundleVersion, bundleGroup, bundleArtifact, type); + } + + @Override + public Set getReportingTaskTypes(final String bundleGroup, final String bundleArtifact, final String type) { + return controllerFacade.getReportingTaskTypes(bundleGroup, bundleArtifact, type); + } + + @Override + public ProcessorEntity getProcessor(final String id) { + final ProcessorNode processor = processorDAO.getProcessor(id); + return createProcessorEntity(processor, NiFiUserUtils.getNiFiUser()); + } + + @Override + public PropertyDescriptorDTO getProcessorPropertyDescriptor(final String id, final String property) { + final ProcessorNode processor = processorDAO.getProcessor(id); + PropertyDescriptor descriptor = processor.getPropertyDescriptor(property); + + // return an invalid descriptor if the processor doesn't support this property + if (descriptor == null) { + descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build(); + } + + return dtoFactory.createPropertyDescriptorDto(descriptor, processor.getProcessGroup().getIdentifier()); + } + + @Override + public ProcessorStatusEntity getProcessorStatus(final String id) { + final ProcessorNode processor = processorDAO.getProcessor(id); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor); + final ProcessorStatusDTO dto = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(id)); + return entityFactory.createProcessorStatusEntity(dto, permissions); + } + + @Override + public StatusHistoryEntity getProcessorStatusHistory(final String id) { + final ProcessorNode processor = processorDAO.getProcessor(id); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor); + final StatusHistoryDTO dto = controllerFacade.getProcessorStatusHistory(id); + return entityFactory.createStatusHistoryEntity(dto, permissions); + } + + private boolean authorizeBulletin(final Bulletin bulletin) { + final String sourceId = bulletin.getSourceId(); + final ComponentType type = bulletin.getSourceType(); + + final Authorizable authorizable; + try { + switch (type) { + case PROCESSOR: + authorizable = authorizableLookup.getProcessor(sourceId).getAuthorizable(); + break; + case REPORTING_TASK: + authorizable = authorizableLookup.getReportingTask(sourceId).getAuthorizable(); + break; + case CONTROLLER_SERVICE: + authorizable = authorizableLookup.getControllerService(sourceId).getAuthorizable(); + break; + case FLOW_CONTROLLER: + authorizable = controllerFacade; + break; + case INPUT_PORT: + authorizable = authorizableLookup.getInputPort(sourceId); + break; + case OUTPUT_PORT: + authorizable = authorizableLookup.getOutputPort(sourceId); + break; + case REMOTE_PROCESS_GROUP: + authorizable = authorizableLookup.getRemoteProcessGroup(sourceId); + break; + default: + throw new WebApplicationException(Response.serverError().entity("An unexpected type of component is the source of this bulletin.").build()); + } + } catch (final ResourceNotFoundException e) { + // if the underlying component is gone, disallow + return false; + } + + // perform the authorization + final AuthorizationResult result = authorizable.checkAuthorization(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + return Result.Approved.equals(result.getResult()); + } + + @Override + public BulletinBoardDTO getBulletinBoard(final BulletinQueryDTO query) { + // build the query + final BulletinQuery.Builder queryBuilder = new BulletinQuery.Builder() + .groupIdMatches(query.getGroupId()) + .sourceIdMatches(query.getSourceId()) + .nameMatches(query.getName()) + .messageMatches(query.getMessage()) + .after(query.getAfter()) + .limit(query.getLimit()); + + // perform the query + final List results = bulletinRepository.findBulletins(queryBuilder.build()); + + // perform the query and generate the results - iterating in reverse order since we are + // getting the most recent results by ordering by timestamp desc above. this gets the + // exact results we want but in reverse order + final List bulletinEntities = new ArrayList<>(); + for (final ListIterator bulletinIter = results.listIterator(results.size()); bulletinIter.hasPrevious(); ) { + final Bulletin bulletin = bulletinIter.previous(); + bulletinEntities.add(entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), authorizeBulletin(bulletin))); + } + + // create the bulletin board + final BulletinBoardDTO bulletinBoard = new BulletinBoardDTO(); + bulletinBoard.setBulletins(bulletinEntities); + bulletinBoard.setGenerated(new Date()); + return bulletinBoard; + } + + @Override + public SystemDiagnosticsDTO getSystemDiagnostics() { + final SystemDiagnostics sysDiagnostics = controllerFacade.getSystemDiagnostics(); + return dtoFactory.createSystemDiagnosticsDto(sysDiagnostics); + } + + @Override + public List getResources() { + final List resources = controllerFacade.getResources(); + final List resourceDtos = new ArrayList<>(resources.size()); + for (final Resource resource : resources) { + resourceDtos.add(dtoFactory.createResourceDto(resource)); + } + return resourceDtos; + } + + @Override + public void discoverCompatibleBundles(VersionedProcessGroup versionedGroup) { + BundleUtils.discoverCompatibleBundles(controllerFacade.getExtensionManager(), versionedGroup); + } + + @Override + public BundleCoordinate getCompatibleBundle(String type, BundleDTO bundleDTO) { + return BundleUtils.getCompatibleBundle(controllerFacade.getExtensionManager(), type, bundleDTO); + } + + @Override + public ConfigurableComponent getTempComponent(String classType, BundleCoordinate bundleCoordinate) { + return controllerFacade.getExtensionManager().getTempComponent(classType, bundleCoordinate); + } + + /** + * Ensures the specified user has permission to access the specified port. This method does + * not utilize the DataTransferAuthorizable as that will enforce the entire chain is + * authorized for the transfer. This method is only invoked when obtaining the site to site + * details so the entire chain isn't necessary. + */ + private boolean isUserAuthorized(final NiFiUser user, final RootGroupPort port) { + final boolean isSiteToSiteSecure = Boolean.TRUE.equals(properties.isSiteToSiteSecure()); + + // if site to site is not secure, allow all users + if (!isSiteToSiteSecure) { + return true; + } + + final Map userContext; + if (user.getClientAddress() != null && !user.getClientAddress().trim().isEmpty()) { + userContext = new HashMap<>(); + userContext.put(UserContextKeys.CLIENT_ADDRESS.name(), user.getClientAddress()); + } else { + userContext = null; + } + + final AuthorizationRequest request = new AuthorizationRequest.Builder() + .resource(ResourceFactory.getDataTransferResource(port.getResource())) + .identity(user.getIdentity()) + .groups(user.getGroups()) + .anonymous(user.isAnonymous()) + .accessAttempt(false) + .action(RequestAction.WRITE) + .userContext(userContext) + .explanationSupplier(() -> "Unable to retrieve port details.") + .build(); + + final AuthorizationResult result = authorizer.authorize(request); + return Result.Approved.equals(result.getResult()); + } + + @Override + public ControllerDTO getSiteToSiteDetails() { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + if (user == null) { + throw new WebApplicationException(new Throwable("Unable to access details for current user.")); + } + + // serialize the input ports this NiFi has access to + final Set inputPortDtos = new LinkedHashSet<>(); + final Set inputPorts = controllerFacade.getInputPorts(); + for (final RootGroupPort inputPort : inputPorts) { + if (isUserAuthorized(user, inputPort)) { + final PortDTO dto = new PortDTO(); + dto.setId(inputPort.getIdentifier()); + dto.setName(inputPort.getName()); + dto.setComments(inputPort.getComments()); + dto.setState(inputPort.getScheduledState().toString()); + inputPortDtos.add(dto); + } + } + + // serialize the output ports this NiFi has access to + final Set outputPortDtos = new LinkedHashSet<>(); + for (final RootGroupPort outputPort : controllerFacade.getOutputPorts()) { + if (isUserAuthorized(user, outputPort)) { + final PortDTO dto = new PortDTO(); + dto.setId(outputPort.getIdentifier()); + dto.setName(outputPort.getName()); + dto.setComments(outputPort.getComments()); + dto.setState(outputPort.getScheduledState().toString()); + outputPortDtos.add(dto); + } + } + + // get the root group + final ProcessGroup rootGroup = processGroupDAO.getProcessGroup(controllerFacade.getRootGroupId()); + final ProcessGroupCounts counts = rootGroup.getCounts(); + + // create the controller dto + final ControllerDTO controllerDTO = new ControllerDTO(); + controllerDTO.setId(controllerFacade.getRootGroupId()); + controllerDTO.setInstanceId(controllerFacade.getInstanceId()); + controllerDTO.setName(controllerFacade.getName()); + controllerDTO.setComments(controllerFacade.getComments()); + controllerDTO.setInputPorts(inputPortDtos); + controllerDTO.setOutputPorts(outputPortDtos); + controllerDTO.setInputPortCount(inputPortDtos.size()); + controllerDTO.setOutputPortCount(outputPortDtos.size()); + controllerDTO.setRunningCount(counts.getRunningCount()); + controllerDTO.setStoppedCount(counts.getStoppedCount()); + controllerDTO.setInvalidCount(counts.getInvalidCount()); + controllerDTO.setDisabledCount(counts.getDisabledCount()); + + // determine the site to site configuration + controllerDTO.setRemoteSiteListeningPort(controllerFacade.getRemoteSiteListeningPort()); + controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getRemoteSiteListeningHttpPort()); + controllerDTO.setSiteToSiteSecure(controllerFacade.isRemoteSiteCommsSecure()); + + return controllerDTO; + } + + @Override + public ControllerConfigurationEntity getControllerConfiguration() { + final Revision rev = revisionManager.getRevision(FlowController.class.getSimpleName()); + final ControllerConfigurationDTO dto = dtoFactory.createControllerConfigurationDto(controllerFacade); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerFacade); + final RevisionDTO revision = dtoFactory.createRevisionDTO(rev); + return entityFactory.createControllerConfigurationEntity(dto, revision, permissions); + } + + @Override + public ControllerBulletinsEntity getControllerBulletins() { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final ControllerBulletinsEntity controllerBulletinsEntity = new ControllerBulletinsEntity(); + + final List controllerBulletinEntities = new ArrayList<>(); + + final Authorizable controllerAuthorizable = authorizableLookup.getController(); + final boolean authorized = controllerAuthorizable.isAuthorized(authorizer, RequestAction.READ, user); + final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController()); + controllerBulletinEntities.addAll(bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, authorized)).collect(Collectors.toList())); + + // get the controller service bulletins + final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build(); + final List allControllerServiceBulletins = bulletinRepository.findBulletins(controllerServiceQuery); + final List controllerServiceBulletinEntities = new ArrayList<>(); + for (final Bulletin bulletin : allControllerServiceBulletins) { + try { + final Authorizable controllerServiceAuthorizable = authorizableLookup.getControllerService(bulletin.getSourceId()).getAuthorizable(); + final boolean controllerServiceAuthorized = controllerServiceAuthorizable.isAuthorized(authorizer, RequestAction.READ, user); + + final BulletinEntity controllerServiceBulletin = entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), controllerServiceAuthorized); + controllerServiceBulletinEntities.add(controllerServiceBulletin); + controllerBulletinEntities.add(controllerServiceBulletin); + } catch (final ResourceNotFoundException e) { + // controller service missing.. skip + } + } + controllerBulletinsEntity.setControllerServiceBulletins(controllerServiceBulletinEntities); + + // get the reporting task bulletins + final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build(); + final List allReportingTaskBulletins = bulletinRepository.findBulletins(reportingTaskQuery); + final List reportingTaskBulletinEntities = new ArrayList<>(); + for (final Bulletin bulletin : allReportingTaskBulletins) { + try { + final Authorizable reportingTaskAuthorizable = authorizableLookup.getReportingTask(bulletin.getSourceId()).getAuthorizable(); + final boolean reportingTaskAuthorizableAuthorized = reportingTaskAuthorizable.isAuthorized(authorizer, RequestAction.READ, user); + + final BulletinEntity reportingTaskBulletin = entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), reportingTaskAuthorizableAuthorized); + reportingTaskBulletinEntities.add(reportingTaskBulletin); + controllerBulletinEntities.add(reportingTaskBulletin); + } catch (final ResourceNotFoundException e) { + // reporting task missing.. skip + } + } + controllerBulletinsEntity.setReportingTaskBulletins(reportingTaskBulletinEntities); + + controllerBulletinsEntity.setBulletins(pruneAndSortBulletins(controllerBulletinEntities, BulletinRepository.MAX_BULLETINS_FOR_CONTROLLER)); + return controllerBulletinsEntity; + } + + @Override + public FlowConfigurationEntity getFlowConfiguration() { + final FlowConfigurationDTO dto = dtoFactory.createFlowConfigurationDto(properties.getAutoRefreshInterval(), + properties.getDefaultBackPressureObjectThreshold(), properties.getDefaultBackPressureDataSizeThreshold(),properties.getDcaeDistributorApiHostname()); + final FlowConfigurationEntity entity = new FlowConfigurationEntity(); + entity.setFlowConfiguration(dto); + return entity; + } + + @Override + public AccessPolicyEntity getAccessPolicy(final String accessPolicyId) { + final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(accessPolicyId); + return createAccessPolicyEntity(accessPolicy); + } + + @Override + public AccessPolicyEntity getAccessPolicy(final RequestAction requestAction, final String resource) { + Authorizable authorizable; + try { + authorizable = authorizableLookup.getAuthorizableFromResource(resource); + } catch (final ResourceNotFoundException e) { + // unable to find the underlying authorizable... user authorized based on top level /policies... create + // an anonymous authorizable to attempt to locate an existing policy for this resource + authorizable = new Authorizable() { + @Override + public Authorizable getParentAuthorizable() { + return null; + } + + @Override + public Resource getResource() { + return new Resource() { + @Override + public String getIdentifier() { + return resource; + } + + @Override + public String getName() { + return resource; + } + + @Override + public String getSafeDescription() { + return "Policy " + resource; + } + }; + } + }; + } + + final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(requestAction, authorizable); + return createAccessPolicyEntity(accessPolicy); + } + + private AccessPolicyEntity createAccessPolicyEntity(final AccessPolicy accessPolicy) { + final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(accessPolicy.getIdentifier())); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicy.getIdentifier())); + final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource()); + return entityFactory.createAccessPolicyEntity( + dtoFactory.createAccessPolicyDto(accessPolicy, + accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()), + accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()), componentReference), + revision, permissions); + } + + @Override + public UserEntity getUser(final String userId) { + final User user = userDAO.getUser(userId); + return createUserEntity(user, true); + } + + @Override + public Set getUsers() { + final Set users = userDAO.getUsers(); + return users.stream() + .map(user -> createUserEntity(user, false)) + .collect(Collectors.toSet()); + } + + private UserEntity createUserEntity(final User user, final boolean enforceUserExistence) { + final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(user.getIdentifier())); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); + final Set userGroups = userGroupDAO.getUserGroupsForUser(user.getIdentifier()).stream() + .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(enforceUserExistence)).collect(Collectors.toSet()); + final Set policyEntities = userGroupDAO.getAccessPoliciesForUser(user.getIdentifier()).stream() + .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); + return entityFactory.createUserEntity(dtoFactory.createUserDto(user, userGroups, policyEntities), userRevision, permissions); + } + + private UserGroupEntity createUserGroupEntity(final Group userGroup, final boolean enforceGroupExistence) { + final RevisionDTO userGroupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userGroup.getIdentifier())); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); + final Set users = userGroup.getUsers().stream().map(mapUserIdToTenantEntity(enforceGroupExistence)).collect(Collectors.toSet()); + final Set policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(userGroup.getIdentifier()).stream() + .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); + return entityFactory.createUserGroupEntity(dtoFactory.createUserGroupDto(userGroup, users, policyEntities), userGroupRevision, permissions); + } + + @Override + public UserGroupEntity getUserGroup(final String userGroupId) { + final Group userGroup = userGroupDAO.getUserGroup(userGroupId); + return createUserGroupEntity(userGroup, true); + } + + @Override + public Set getUserGroups() { + final Set userGroups = userGroupDAO.getUserGroups(); + return userGroups.stream() + .map(userGroup -> createUserGroupEntity(userGroup, false)) + .collect(Collectors.toSet()); + } + + private LabelEntity createLabelEntity(final Label label) { + final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(label.getIdentifier())); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label); + return entityFactory.createLabelEntity(dtoFactory.createLabelDto(label), revision, permissions); + } + + @Override + public Set getLabels(final String groupId) { + final Set

+ * Formerly, if the docsDirectory did not exist NIFI would fail to start + * with an IllegalStateException and a rather unhelpful log message. + * NIFI-2184 updates the process such that if the docsDirectory does not + * exist an attempt will be made to create the directory. If that is + * successful NIFI will no longer fail and will start successfully barring + * any other errors. The side effect of the docsDirectory not being present + * is that the documentation links under the 'General' portion of the help + * page will not be accessible, but at least the process will be running. + * + * @param docsDirectory Name of documentation directory in installation directory. + * @return A File object to the documentation directory; else startUpFailure called. + */ + private File getDocsDir(final String docsDirectory) { + File docsDir; + try { + docsDir = Paths.get(docsDirectory).toRealPath().toFile(); + } catch (IOException ex) { + logger.info("Directory '" + docsDirectory + "' is missing. Some documentation will be unavailable."); + docsDir = new File(docsDirectory).getAbsoluteFile(); + final boolean made = docsDir.mkdirs(); + if (!made) { + logger.error("Failed to create 'docs' directory!"); + startUpFailure(new IOException(docsDir.getAbsolutePath() + " could not be created")); + } + } + return docsDir; + } + + private File getWorkingDocsDirectory(final File componentDocsDirPath) { + File workingDocsDirectory = null; + try { + workingDocsDirectory = componentDocsDirPath.toPath().toRealPath().getParent().toFile(); + } catch (IOException ex) { + logger.error("Failed to load :" + componentDocsDirPath.getAbsolutePath()); + startUpFailure(ex); + } + return workingDocsDirectory; + } + + private File getWebApiDocsDir() { + // load the rest documentation + final File webApiDocsDir = new File(webApiContext.getTempDirectory(), "webapp/docs"); + if (!webApiDocsDir.exists()) { + final boolean made = webApiDocsDir.mkdirs(); + if (!made) { + logger.error("Failed to create " + webApiDocsDir.getAbsolutePath()); + startUpFailure(new IOException(webApiDocsDir.getAbsolutePath() + " could not be created")); + } + } + return webApiDocsDir; + } + + private void configureConnectors(final Server server) throws ServerConfigurationException { + // create the http configuration + final HttpConfiguration httpConfiguration = new HttpConfiguration(); + final int headerSize = DataUnit.parseDataSize(props.getWebMaxHeaderSize(), DataUnit.B).intValue(); + httpConfiguration.setRequestHeaderSize(headerSize); + httpConfiguration.setResponseHeaderSize(headerSize); + + // Check if both HTTP and HTTPS connectors are configured and fail if both are configured + if (bothHttpAndHttpsConnectorsConfigured(props)) { + logger.error("NiFi only supports one mode of HTTP or HTTPS operation, not both simultaneously. " + + "Check the nifi.properties file and ensure that either the HTTP hostname and port or the HTTPS hostname and port are empty"); + startUpFailure(new IllegalStateException("Only one of the HTTP and HTTPS connectors can be configured at one time")); + } + + if (props.getSslPort() != null) { + configureHttpsConnector(server, httpConfiguration); + } else if (props.getPort() != null) { + configureHttpConnector(server, httpConfiguration); + } else { + logger.error("Neither the HTTP nor HTTPS connector was configured in nifi.properties"); + startUpFailure(new IllegalStateException("Must configure HTTP or HTTPS connector")); + } + } + + /** + * Configures an HTTPS connector and adds it to the server. + * + * @param server the Jetty server instance + * @param httpConfiguration the configuration object for the HTTPS protocol settings + */ + private void configureHttpsConnector(Server server, HttpConfiguration httpConfiguration) { + String hostname = props.getProperty(NiFiProperties.WEB_HTTPS_HOST); + final Integer port = props.getSslPort(); + String connectorLabel = "HTTPS"; + final Map httpsNetworkInterfaces = props.getHttpsNetworkInterfaces(); + ServerConnectorCreator scc = (s, c) -> createUnconfiguredSslServerConnector(s, c, port); + + configureGenericConnector(server, httpConfiguration, hostname, port, connectorLabel, httpsNetworkInterfaces, scc); + } + + /** + * Configures an HTTP connector and adds it to the server. + * + * @param server the Jetty server instance + * @param httpConfiguration the configuration object for the HTTP protocol settings + */ + private void configureHttpConnector(Server server, HttpConfiguration httpConfiguration) { + String hostname = props.getProperty(NiFiProperties.WEB_HTTP_HOST); + final Integer port = props.getPort(); + String connectorLabel = "HTTP"; + final Map httpNetworkInterfaces = props.getHttpNetworkInterfaces(); + ServerConnectorCreator scc = (s, c) -> new ServerConnector(s, new HttpConnectionFactory(c)); + + configureGenericConnector(server, httpConfiguration, hostname, port, connectorLabel, httpNetworkInterfaces, scc); + } + + /** + * Configures an HTTP(S) connector for the server given the provided parameters. The functionality between HTTP and HTTPS connectors is largely similar. + * Here the common behavior has been extracted into a shared method and the respective calling methods obtain the right values and a lambda function for the differing behavior. + * + * @param server the Jetty server instance + * @param configuration the HTTP/HTTPS configuration instance + * @param hostname the hostname from the nifi.properties file + * @param port the port to expose + * @param connectorLabel used for log output (e.g. "HTTP" or "HTTPS") + * @param networkInterfaces the map of network interfaces from nifi.properties + * @param serverConnectorCreator a function which accepts a {@code Server} and {@code HttpConnection} instance and returns a {@code ServerConnector} + */ + private void configureGenericConnector(Server server, HttpConfiguration configuration, String hostname, Integer port, String connectorLabel, Map networkInterfaces, + ServerConnectorCreator serverConnectorCreator) { + if (port < 0 || (int) Math.pow(2, 16) <= port) { + throw new ServerConfigurationException("Invalid " + connectorLabel + " port: " + port); + } + + logger.info("Configuring Jetty for " + connectorLabel + " on port: " + port); + + final List serverConnectors = Lists.newArrayList(); + + // Calculate Idle Timeout as twice the auto-refresh interval. This ensures that even with some variance in timing, + // we are able to avoid closing connections from users' browsers most of the time. This can make a significant difference + // in HTTPS connections, as each HTTPS connection that is established must perform the SSL handshake. + final String autoRefreshInterval = props.getAutoRefreshInterval(); + final long autoRefreshMillis = autoRefreshInterval == null ? 30000L : FormatUtils.getTimeDuration(autoRefreshInterval, TimeUnit.MILLISECONDS); + final long idleTimeout = autoRefreshMillis * 2; + + // If the interfaces collection is empty or each element is empty + if (networkInterfaces.isEmpty() || networkInterfaces.values().stream().filter(value -> !Strings.isNullOrEmpty(value)).collect(Collectors.toList()).isEmpty()) { + final ServerConnector serverConnector = serverConnectorCreator.create(server, configuration); + + // Set host and port + if (StringUtils.isNotBlank(hostname)) { + serverConnector.setHost(hostname); + } + serverConnector.setPort(port); + serverConnector.setIdleTimeout(idleTimeout); + serverConnectors.add(serverConnector); + } else { + // Add connectors for all IPs from network interfaces + serverConnectors.addAll(Lists.newArrayList(networkInterfaces.values().stream().map(ifaceName -> { + NetworkInterface iface = null; + try { + iface = NetworkInterface.getByName(ifaceName); + } catch (SocketException e) { + logger.error("Unable to get network interface by name {}", ifaceName, e); + } + if (iface == null) { + logger.warn("Unable to find network interface named {}", ifaceName); + } + return iface; + }).filter(Objects::nonNull).flatMap(iface -> Collections.list(iface.getInetAddresses()).stream()) + .map(inetAddress -> { + final ServerConnector serverConnector = serverConnectorCreator.create(server, configuration); + + // Set host and port + serverConnector.setHost(inetAddress.getHostAddress()); + serverConnector.setPort(port); + serverConnector.setIdleTimeout(idleTimeout); + + return serverConnector; + }).collect(Collectors.toList()))); + } + // Add all connectors + serverConnectors.forEach(server::addConnector); + } + + /** + * Returns true if there are configured properties for both HTTP and HTTPS connectors (specifically port because the hostname can be left blank in the HTTP connector). + * Prints a warning log message with the relevant properties. + * + * @param props the NiFiProperties + * @return true if both ports are present + */ + static boolean bothHttpAndHttpsConnectorsConfigured(NiFiProperties props) { + Integer httpPort = props.getPort(); + String httpHostname = props.getProperty(NiFiProperties.WEB_HTTP_HOST); + + Integer httpsPort = props.getSslPort(); + String httpsHostname = props.getProperty(NiFiProperties.WEB_HTTPS_HOST); + + if (httpPort != null && httpsPort != null) { + logger.warn("Both the HTTP and HTTPS connectors are configured in nifi.properties. Only one of these connectors should be configured. See the NiFi Admin Guide for more details"); + logger.warn("HTTP connector: http://" + httpHostname + ":" + httpPort); + logger.warn("HTTPS connector: https://" + httpsHostname + ":" + httpsPort); + return true; + } + + return false; + } + + private ServerConnector createUnconfiguredSslServerConnector(Server server, HttpConfiguration httpConfiguration, int port) { + // add some secure config + final HttpConfiguration httpsConfiguration = new HttpConfiguration(httpConfiguration); + httpsConfiguration.setSecureScheme("https"); + httpsConfiguration.setSecurePort(port); + httpsConfiguration.addCustomizer(new SecureRequestCustomizer()); + + // build the connector + return new ServerConnector(server, + new SslConnectionFactory(createSslContextFactory(), "http/1.1"), + new HttpConnectionFactory(httpsConfiguration)); + } + + private SslContextFactory createSslContextFactory() { + final SslContextFactory contextFactory = new SslContextFactory(); + configureSslContextFactory(contextFactory, props); + return contextFactory; + } + + protected static void configureSslContextFactory(SslContextFactory contextFactory, NiFiProperties props) { + // require client auth when not supporting login, Kerberos service, or anonymous access + if (props.isClientAuthRequiredForRestApi()) { + contextFactory.setNeedClientAuth(true); + } else { + contextFactory.setWantClientAuth(true); + } + + /* below code sets JSSE system properties when values are provided */ + // keystore properties + if (StringUtils.isNotBlank(props.getProperty(NiFiProperties.SECURITY_KEYSTORE))) { + contextFactory.setKeyStorePath(props.getProperty(NiFiProperties.SECURITY_KEYSTORE)); + } + String keyStoreType = props.getProperty(NiFiProperties.SECURITY_KEYSTORE_TYPE); + if (StringUtils.isNotBlank(keyStoreType)) { + contextFactory.setKeyStoreType(keyStoreType); + String keyStoreProvider = KeyStoreUtils.getKeyStoreProvider(keyStoreType); + if (StringUtils.isNoneEmpty(keyStoreProvider)) { + contextFactory.setKeyStoreProvider(keyStoreProvider); + } + } + final String keystorePassword = props.getProperty(NiFiProperties.SECURITY_KEYSTORE_PASSWD); + final String keyPassword = props.getProperty(NiFiProperties.SECURITY_KEY_PASSWD); + if (StringUtils.isNotBlank(keystorePassword)) { + // if no key password was provided, then assume the keystore password is the same as the key password. + final String defaultKeyPassword = (StringUtils.isBlank(keyPassword)) ? keystorePassword : keyPassword; + contextFactory.setKeyStorePassword(keystorePassword); + contextFactory.setKeyManagerPassword(defaultKeyPassword); + } else if (StringUtils.isNotBlank(keyPassword)) { + // since no keystore password was provided, there will be no keystore integrity check + contextFactory.setKeyManagerPassword(keyPassword); + } + + // truststore properties + if (StringUtils.isNotBlank(props.getProperty(NiFiProperties.SECURITY_TRUSTSTORE))) { + contextFactory.setTrustStorePath(props.getProperty(NiFiProperties.SECURITY_TRUSTSTORE)); + } + String trustStoreType = props.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_TYPE); + if (StringUtils.isNotBlank(trustStoreType)) { + contextFactory.setTrustStoreType(trustStoreType); + String trustStoreProvider = KeyStoreUtils.getKeyStoreProvider(trustStoreType); + if (StringUtils.isNoneEmpty(trustStoreProvider)) { + contextFactory.setTrustStoreProvider(trustStoreProvider); + } + } + if (StringUtils.isNotBlank(props.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD))) { + contextFactory.setTrustStorePassword(props.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD)); + } + } + + @Override + public void start() { + try { + // Create a standard extension manager and discover extensions + final ExtensionDiscoveringManager extensionManager = new StandardExtensionDiscoveringManager(); + extensionManager.discoverExtensions(systemBundle, bundles); + extensionManager.logClassLoaderMapping(); + + // Set the extension manager into the holder which makes it available to the Spring context via a factory bean + ExtensionManagerHolder.init(extensionManager); + + // Generate docs for extensions + DocGenerator.generate(props, extensionManager, extensionMapping); + + // start the server + server.start(); + + // ensure everything started successfully + for (Handler handler : server.getChildHandlers()) { + // see if the handler is a web app + if (handler instanceof WebAppContext) { + WebAppContext context = (WebAppContext) handler; + + // see if this webapp had any exceptions that would + // cause it to be unavailable + if (context.getUnavailableException() != null) { + startUpFailure(context.getUnavailableException()); + } + } + } + + // ensure the appropriate wars deployed successfully before injecting the NiFi context and security filters + // this must be done after starting the server (and ensuring there were no start up failures) + if (webApiContext != null) { + // give the web api the component ui extensions + final ServletContext webApiServletContext = webApiContext.getServletHandler().getServletContext(); + webApiServletContext.setAttribute("nifi-ui-extensions", componentUiExtensions); + + // get the application context + final WebApplicationContext webApplicationContext = WebApplicationContextUtils.getRequiredWebApplicationContext(webApiServletContext); + final NiFiWebConfigurationContext configurationContext = webApplicationContext.getBean("nifiWebConfigurationContext", NiFiWebConfigurationContext.class); + final FilterHolder securityFilter = webApiContext.getServletHandler().getFilter("springSecurityFilterChain"); + + // component ui extensions + performInjectionForComponentUis(componentUiExtensionWebContexts, configurationContext, securityFilter); + + // content viewer extensions + performInjectionForContentViewerUis(contentViewerWebContexts, securityFilter); + + // content viewer controller + if (webContentViewerContext != null) { + final ContentAccess contentAccess = webApplicationContext.getBean("contentAccess", ContentAccess.class); + + // add the content access + final ServletContext webContentViewerServletContext = webContentViewerContext.getServletHandler().getServletContext(); + webContentViewerServletContext.setAttribute("nifi-content-access", contentAccess); + + if (securityFilter != null) { + webContentViewerContext.addFilter(securityFilter, "/*", EnumSet.allOf(DispatcherType.class)); + } + } + } + + // ensure the web document war was loaded and provide the extension mapping + if (webDocsContext != null) { + final ServletContext webDocsServletContext = webDocsContext.getServletHandler().getServletContext(); + webDocsServletContext.setAttribute("nifi-extension-mapping", extensionMapping); + } + + // if this nifi is a node in a cluster, start the flow service and load the flow - the + // flow service is loaded here for clustered nodes because the loading of the flow will + // initialize the connection between the node and the NCM. if the node connects (starts + // heartbeating, etc), the NCM may issue web requests before the application (wars) have + // finished loading. this results in the node being disconnected since its unable to + // successfully respond to the requests. to resolve this, flow loading was moved to here + // (after the wars have been successfully deployed) when this nifi instance is a node + // in a cluster + if (props.isNode()) { + + FlowService flowService = null; + try { + + logger.info("Loading Flow..."); + + ApplicationContext ctx = WebApplicationContextUtils.getWebApplicationContext(webApiContext.getServletContext()); + flowService = ctx.getBean("flowService", FlowService.class); + + // start and load the flow + flowService.start(); + flowService.load(null); + + logger.info("Flow loaded successfully."); + + } catch (BeansException | LifeCycleStartException | IOException | FlowSerializationException | FlowSynchronizationException | UninheritableFlowException e) { + // ensure the flow service is terminated + if (flowService != null && flowService.isRunning()) { + flowService.stop(false); + } + logger.error("Unable to load flow due to: " + e, e); + throw new Exception("Unable to load flow due to: " + e); // cannot wrap the exception as they are not defined in a classloader accessible to the caller + } + } + + final NarLoader narLoader = new StandardNarLoader( + props.getExtensionsWorkingDirectory(), + props.getComponentDocumentationWorkingDirectory(), + NarClassLoadersHolder.getInstance(), + extensionManager, + extensionMapping, + this); + + narAutoLoader = new NarAutoLoader(props.getNarAutoLoadDirectory(), narLoader); + narAutoLoader.start(); + + URI jarsIndex = props.getDCAEJarIndexURI(); + + // REVIEW: Added ability to turn off the loaidng of dcae jars by providing no url + if (jarsIndex == null) { + StringBuilder sb = new StringBuilder(); + sb.append("Auto-loading of DCAE jars is turned off."); + sb.append(" You must set the value of \"nifi.dcae.jars.index.url\""); + sb.append(" to the full url to the index JSON of DCAE jars in the nifi.properties file"); + sb.append(" in order to activate this feature."); + logger.warn(sb.toString()); + } else { + this.dcaeAutoLoader = new DCAEAutoLoader(); + this.dcaeAutoLoader.start(jarsIndex, extensionManager); + } + + // dump the application url after confirming everything started successfully + dumpUrls(); + } catch (Exception ex) { + startUpFailure(ex); + } + } + + private void performInjectionForComponentUis(final Collection componentUiExtensionWebContexts, + final NiFiWebConfigurationContext configurationContext, final FilterHolder securityFilter) { + if (CollectionUtils.isNotEmpty(componentUiExtensionWebContexts)) { + for (final WebAppContext customUiContext : componentUiExtensionWebContexts) { + // set the NiFi context in each custom ui servlet context + final ServletContext customUiServletContext = customUiContext.getServletHandler().getServletContext(); + customUiServletContext.setAttribute("nifi-web-configuration-context", configurationContext); + + // add the security filter to any ui extensions wars + if (securityFilter != null) { + customUiContext.addFilter(securityFilter, "/*", EnumSet.allOf(DispatcherType.class)); + } + } + } + } + + private void performInjectionForContentViewerUis(final Collection contentViewerWebContexts, + final FilterHolder securityFilter) { + if (CollectionUtils.isNotEmpty(contentViewerWebContexts)) { + for (final WebAppContext contentViewerContext : contentViewerWebContexts) { + // add the security filter to any content viewer wars + if (securityFilter != null) { + contentViewerContext.addFilter(securityFilter, "/*", EnumSet.allOf(DispatcherType.class)); + } + } + } + } + + private void dumpUrls() throws SocketException { + final List urls = new ArrayList<>(); + + for (Connector connector : server.getConnectors()) { + if (connector instanceof ServerConnector) { + final ServerConnector serverConnector = (ServerConnector) connector; + + Set hosts = new HashSet<>(); + + // determine the hosts + if (StringUtils.isNotBlank(serverConnector.getHost())) { + hosts.add(serverConnector.getHost()); + } else { + Enumeration networkInterfaces = NetworkInterface.getNetworkInterfaces(); + if (networkInterfaces != null) { + for (NetworkInterface networkInterface : Collections.list(networkInterfaces)) { + for (InetAddress inetAddress : Collections.list(networkInterface.getInetAddresses())) { + hosts.add(inetAddress.getHostAddress()); + } + } + } + } + + // ensure some hosts were found + if (!hosts.isEmpty()) { + String scheme = "http"; + if (props.getSslPort() != null && serverConnector.getPort() == props.getSslPort()) { + scheme = "https"; + } + + // dump each url + for (String host : hosts) { + urls.add(String.format("%s://%s:%s", scheme, host, serverConnector.getPort())); + } + } + } + } + + if (urls.isEmpty()) { + logger.warn("NiFi has started, but the UI is not available on any hosts. Please verify the host properties."); + } else { + // log the ui location + logger.info("NiFi has started. The UI is available at the following URLs:"); + for (final String url : urls) { + logger.info(String.format("%s/nifi", url)); + } + } + } + + private void startUpFailure(Throwable t) { + System.err.println("Failed to start web server: " + t.getMessage()); + System.err.println("Shutting down..."); + logger.warn("Failed to start web server... shutting down.", t); + System.exit(1); + } + + @Override + public void setExtensionMapping(ExtensionMapping extensionMapping) { + this.extensionMapping = extensionMapping; + } + + @Override + public void setBundles(Bundle systemBundle, Set bundles) { + this.systemBundle = systemBundle; + this.bundles = bundles; + } + + @Override + public void stop() { + try { + server.stop(); + } catch (Exception ex) { + logger.warn("Failed to stop web server", ex); + } + + try { + if (narAutoLoader != null) { + narAutoLoader.stop(); + } + + if (dcaeAutoLoader != null) { + dcaeAutoLoader.stop(); + } + } catch (Exception e) { + logger.warn("Failed to stop NAR auto-loader", e); + } + } + + /** + * Holds the result of loading WARs for custom UIs. + */ + private static class ExtensionUiInfo { + + private final Collection webAppContexts; + private final Map mimeMappings; + private final Collection componentUiExtensionWebContexts; + private final Collection contentViewerWebContexts; + private final Map> componentUiExtensionsByType; + + public ExtensionUiInfo(final Collection webAppContexts, + final Map mimeMappings, + final Collection componentUiExtensionWebContexts, + final Collection contentViewerWebContexts, + final Map> componentUiExtensionsByType) { + this.webAppContexts = webAppContexts; + this.mimeMappings = mimeMappings; + this.componentUiExtensionWebContexts = componentUiExtensionWebContexts; + this.contentViewerWebContexts = contentViewerWebContexts; + this.componentUiExtensionsByType = componentUiExtensionsByType; + } + + public Collection getWebAppContexts() { + return webAppContexts; + } + + public Map getMimeMappings() { + return mimeMappings; + } + + public Collection getComponentUiExtensionWebContexts() { + return componentUiExtensionWebContexts; + } + + public Collection getContentViewerWebContexts() { + return contentViewerWebContexts; + } + + public Map> getComponentUiExtensionsByType() { + return componentUiExtensionsByType; + } + } +} + +@FunctionalInterface +interface ServerConnectorCreator { + ServerConnector create(Server server, HttpConfiguration httpConfiguration); +} -- cgit 1.2.3-korg