summaryrefslogtreecommitdiffstats
path: root/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
diff options
context:
space:
mode:
Diffstat (limited to 'mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java')
-rw-r--r--mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java4899
1 files changed, 4899 insertions, 0 deletions
diff --git a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
new file mode 100644
index 0000000..8ad05bd
--- /dev/null
+++ b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -0,0 +1,4899 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Modifications to the original nifi code for the ONAP project are made
+ * available under the Apache License, Version 2.0
+ */
+package org.apache.nifi.web;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.nifi.action.Action;
+import org.apache.nifi.action.Component;
+import org.apache.nifi.action.FlowChangeAction;
+import org.apache.nifi.action.Operation;
+import org.apache.nifi.action.details.FlowChangePurgeDetails;
+import org.apache.nifi.admin.service.AuditService;
+import org.apache.nifi.authorization.AccessDeniedException;
+import org.apache.nifi.authorization.AccessPolicy;
+import org.apache.nifi.authorization.AuthorizableLookup;
+import org.apache.nifi.authorization.AuthorizationRequest;
+import org.apache.nifi.authorization.AuthorizationResult;
+import org.apache.nifi.authorization.AuthorizationResult.Result;
+import org.apache.nifi.authorization.AuthorizeAccess;
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.Group;
+import org.apache.nifi.authorization.RequestAction;
+import org.apache.nifi.authorization.Resource;
+import org.apache.nifi.authorization.User;
+import org.apache.nifi.authorization.UserContextKeys;
+import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.authorization.resource.EnforcePolicyPermissionsThroughBaseResource;
+import org.apache.nifi.authorization.resource.OperationAuthorizable;
+import org.apache.nifi.authorization.resource.ResourceFactory;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.authorization.user.NiFiUserUtils;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
+import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
+import org.apache.nifi.cluster.coordination.node.ClusterRoles;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.coordination.node.OffloadCode;
+import org.apache.nifi.cluster.event.NodeEvent;
+import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.components.ConfigurableComponent;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ComponentNode;
+import org.apache.nifi.controller.Counter;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.Snippet;
+import org.apache.nifi.controller.Template;
+import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.leader.election.LeaderElectionManager;
+import org.apache.nifi.controller.repository.claim.ContentDirection;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceReference;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.diagnostics.SystemDiagnostics;
+import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.ProcessGroupCounts;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.history.History;
+import org.apache.nifi.history.HistoryQuery;
+import org.apache.nifi.history.PreviousValue;
+import org.apache.nifi.registry.ComponentVariableRegistry;
+import org.apache.nifi.registry.authorization.Permissions;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.FlowRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedComponent;
+import org.apache.nifi.registry.flow.VersionedConnection;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.flow.VersionedFlowState;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor;
+import org.apache.nifi.registry.flow.diff.DifferenceType;
+import org.apache.nifi.registry.flow.diff.FlowComparator;
+import org.apache.nifi.registry.flow.diff.FlowComparison;
+import org.apache.nifi.registry.flow.diff.FlowDifference;
+import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
+import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPort;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.remote.RootGroupPort;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinQuery;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
+import org.apache.nifi.util.BundleUtils;
+import org.apache.nifi.util.FlowDifferenceFilters;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.api.dto.AccessPolicyDTO;
+import org.apache.nifi.web.api.dto.AccessPolicySummaryDTO;
+import org.apache.nifi.web.api.dto.AffectedComponentDTO;
+import org.apache.nifi.web.api.dto.BucketDTO;
+import org.apache.nifi.web.api.dto.BulletinBoardDTO;
+import org.apache.nifi.web.api.dto.BulletinDTO;
+import org.apache.nifi.web.api.dto.BulletinQueryDTO;
+import org.apache.nifi.web.api.dto.BundleDTO;
+import org.apache.nifi.web.api.dto.ClusterDTO;
+import org.apache.nifi.web.api.dto.ComponentDTO;
+import org.apache.nifi.web.api.dto.ComponentDifferenceDTO;
+import org.apache.nifi.web.api.dto.ComponentHistoryDTO;
+import org.apache.nifi.web.api.dto.ComponentReferenceDTO;
+import org.apache.nifi.web.api.dto.ComponentRestrictionPermissionDTO;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
+import org.apache.nifi.web.api.dto.ConnectionDTO;
+import org.apache.nifi.web.api.dto.ControllerConfigurationDTO;
+import org.apache.nifi.web.api.dto.ControllerDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import org.apache.nifi.web.api.dto.CounterDTO;
+import org.apache.nifi.web.api.dto.CountersDTO;
+import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
+import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
+import org.apache.nifi.web.api.dto.DropRequestDTO;
+import org.apache.nifi.web.api.dto.DtoFactory;
+import org.apache.nifi.web.api.dto.EntityFactory;
+import org.apache.nifi.web.api.dto.FlowConfigurationDTO;
+import org.apache.nifi.web.api.dto.FlowFileDTO;
+import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+import org.apache.nifi.web.api.dto.FunnelDTO;
+import org.apache.nifi.web.api.dto.LabelDTO;
+import org.apache.nifi.web.api.dto.ListingRequestDTO;
+import org.apache.nifi.web.api.dto.NodeDTO;
+import org.apache.nifi.web.api.dto.PermissionsDTO;
+import org.apache.nifi.web.api.dto.PortDTO;
+import org.apache.nifi.web.api.dto.PreviousValueDTO;
+import org.apache.nifi.web.api.dto.ProcessGroupDTO;
+import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
+import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
+import org.apache.nifi.web.api.dto.PropertyHistoryDTO;
+import org.apache.nifi.web.api.dto.RegistryDTO;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
+import org.apache.nifi.web.api.dto.ReportingTaskDTO;
+import org.apache.nifi.web.api.dto.RequiredPermissionDTO;
+import org.apache.nifi.web.api.dto.ResourceDTO;
+import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.SnippetDTO;
+import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
+import org.apache.nifi.web.api.dto.TemplateDTO;
+import org.apache.nifi.web.api.dto.UserDTO;
+import org.apache.nifi.web.api.dto.UserGroupDTO;
+import org.apache.nifi.web.api.dto.VariableRegistryDTO;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
+import org.apache.nifi.web.api.dto.VersionedFlowDTO;
+import org.apache.nifi.web.api.dto.action.HistoryDTO;
+import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
+import org.apache.nifi.web.api.dto.diagnostics.ConnectionDiagnosticsDTO;
+import org.apache.nifi.web.api.dto.diagnostics.ControllerServiceDiagnosticsDTO;
+import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsDTO;
+import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsSnapshotDTO;
+import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO;
+import org.apache.nifi.web.api.dto.flow.FlowDTO;
+import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
+import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
+import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO;
+import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO;
+import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
+import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
+import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.PortStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+import org.apache.nifi.web.api.entity.AccessPolicyEntity;
+import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity;
+import org.apache.nifi.web.api.entity.ActionEntity;
+import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
+import org.apache.nifi.web.api.entity.AffectedComponentEntity;
+import org.apache.nifi.web.api.entity.BucketEntity;
+import org.apache.nifi.web.api.entity.BulletinEntity;
+import org.apache.nifi.web.api.entity.ComponentReferenceEntity;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
+import org.apache.nifi.web.api.entity.ControllerBulletinsEntity;
+import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity;
+import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
+import org.apache.nifi.web.api.entity.CurrentUserEntity;
+import org.apache.nifi.web.api.entity.FlowComparisonEntity;
+import org.apache.nifi.web.api.entity.FlowConfigurationEntity;
+import org.apache.nifi.web.api.entity.FlowEntity;
+import org.apache.nifi.web.api.entity.FunnelEntity;
+import org.apache.nifi.web.api.entity.LabelEntity;
+import org.apache.nifi.web.api.entity.PortEntity;
+import org.apache.nifi.web.api.entity.PortStatusEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
+import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
+import org.apache.nifi.web.api.entity.RegistryClientEntity;
+import org.apache.nifi.web.api.entity.RegistryEntity;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity;
+import org.apache.nifi.web.api.entity.ReportingTaskEntity;
+import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
+import org.apache.nifi.web.api.entity.SnippetEntity;
+import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity;
+import org.apache.nifi.web.api.entity.StatusHistoryEntity;
+import org.apache.nifi.web.api.entity.TemplateEntity;
+import org.apache.nifi.web.api.entity.TenantEntity;
+import org.apache.nifi.web.api.entity.UserEntity;
+import org.apache.nifi.web.api.entity.UserGroupEntity;
+import org.apache.nifi.web.api.entity.VariableEntity;
+import org.apache.nifi.web.api.entity.VariableRegistryEntity;
+import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
+import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
+import org.apache.nifi.web.api.entity.VersionedFlowEntity;
+import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
+import org.apache.nifi.web.controller.ControllerFacade;
+import org.apache.nifi.web.dao.AccessPolicyDAO;
+import org.apache.nifi.web.dao.ConnectionDAO;
+import org.apache.nifi.web.dao.ControllerServiceDAO;
+import org.apache.nifi.web.dao.FunnelDAO;
+import org.apache.nifi.web.dao.LabelDAO;
+import org.apache.nifi.web.dao.PortDAO;
+import org.apache.nifi.web.dao.ProcessGroupDAO;
+import org.apache.nifi.web.dao.ProcessorDAO;
+import org.apache.nifi.web.dao.RegistryDAO;
+import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
+import org.apache.nifi.web.dao.ReportingTaskDAO;
+import org.apache.nifi.web.dao.SnippetDAO;
+import org.apache.nifi.web.dao.TemplateDAO;
+import org.apache.nifi.web.dao.UserDAO;
+import org.apache.nifi.web.dao.UserGroupDAO;
+import org.apache.nifi.web.revision.DeleteRevisionTask;
+import org.apache.nifi.web.revision.ExpiredRevisionClaimException;
+import org.apache.nifi.web.revision.RevisionClaim;
+import org.apache.nifi.web.revision.RevisionManager;
+import org.apache.nifi.web.revision.RevisionUpdate;
+import org.apache.nifi.web.revision.StandardRevisionClaim;
+import org.apache.nifi.web.revision.StandardRevisionUpdate;
+import org.apache.nifi.web.revision.UpdateRevisionTask;
+import org.apache.nifi.web.util.SnippetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Implementation of NiFiServiceFacade that performs revision checking.
+ */
+public class StandardNiFiServiceFacade implements NiFiServiceFacade {
+ private static final Logger logger = LoggerFactory.getLogger(StandardNiFiServiceFacade.class);
+ private static final int VALIDATION_WAIT_MILLIS = 50;
+
+ // nifi core components
+ private ControllerFacade controllerFacade;
+ private SnippetUtils snippetUtils;
+
+ // revision manager
+ private RevisionManager revisionManager;
+ private BulletinRepository bulletinRepository;
+
+ // data access objects
+ private ProcessorDAO processorDAO;
+ private ProcessGroupDAO processGroupDAO;
+ private RemoteProcessGroupDAO remoteProcessGroupDAO;
+ private LabelDAO labelDAO;
+ private FunnelDAO funnelDAO;
+ private SnippetDAO snippetDAO;
+ private PortDAO inputPortDAO;
+ private PortDAO outputPortDAO;
+ private ConnectionDAO connectionDAO;
+ private ControllerServiceDAO controllerServiceDAO;
+ private ReportingTaskDAO reportingTaskDAO;
+ private TemplateDAO templateDAO;
+ private UserDAO userDAO;
+ private UserGroupDAO userGroupDAO;
+ private AccessPolicyDAO accessPolicyDAO;
+ private RegistryDAO registryDAO;
+ private ClusterCoordinator clusterCoordinator;
+ private HeartbeatMonitor heartbeatMonitor;
+ private LeaderElectionManager leaderElectionManager;
+
+ // administrative services
+ private AuditService auditService;
+
+ // flow registry
+ private FlowRegistryClient flowRegistryClient;
+
+ // properties
+ private NiFiProperties properties;
+ private DtoFactory dtoFactory;
+ private EntityFactory entityFactory;
+
+ private Authorizer authorizer;
+
+ private AuthorizableLookup authorizableLookup;
+
+ // -----------------------------------------
+ // Synchronization methods
+ // -----------------------------------------
+ @Override
+ public void authorizeAccess(final AuthorizeAccess authorizeAccess) {
+ authorizeAccess.authorize(authorizableLookup);
+ }
+
+ @Override
+ public void verifyRevision(final Revision revision, final NiFiUser user) {
+ final Revision curRevision = revisionManager.getRevision(revision.getComponentId());
+ if (revision.equals(curRevision)) {
+ return;
+ }
+
+ throw new InvalidRevisionException(revision + " is not the most up-to-date revision. This component appears to have been modified");
+ }
+
+ @Override
+ public void verifyRevisions(final Set<Revision> revisions, final NiFiUser user) {
+ for (final Revision revision : revisions) {
+ verifyRevision(revision, user);
+ }
+ }
+
+ @Override
+ public Set<Revision> getRevisionsFromGroup(final String groupId, final Function<ProcessGroup, Set<String>> getComponents) {
+ final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
+ final Set<String> componentIds = getComponents.apply(group);
+ return componentIds.stream().map(id -> revisionManager.getRevision(id)).collect(Collectors.toSet());
+ }
+
+ @Override
+ public Set<Revision> getRevisionsFromSnippet(final String snippetId) {
+ final Snippet snippet = snippetDAO.getSnippet(snippetId);
+ final Set<String> componentIds = new HashSet<>();
+ componentIds.addAll(snippet.getProcessors().keySet());
+ componentIds.addAll(snippet.getFunnels().keySet());
+ componentIds.addAll(snippet.getLabels().keySet());
+ componentIds.addAll(snippet.getConnections().keySet());
+ componentIds.addAll(snippet.getInputPorts().keySet());
+ componentIds.addAll(snippet.getOutputPorts().keySet());
+ componentIds.addAll(snippet.getProcessGroups().keySet());
+ componentIds.addAll(snippet.getRemoteProcessGroups().keySet());
+ return componentIds.stream().map(id -> revisionManager.getRevision(id)).collect(Collectors.toSet());
+ }
+
+ // -----------------------------------------
+ // Verification Operations
+ // -----------------------------------------
+
+ @Override
+ public void verifyListQueue(final String connectionId) {
+ connectionDAO.verifyList(connectionId);
+ }
+
+ @Override
+ public void verifyCreateConnection(final String groupId, final ConnectionDTO connectionDTO) {
+ connectionDAO.verifyCreate(groupId, connectionDTO);
+ }
+
+ @Override
+ public void verifyUpdateConnection(final ConnectionDTO connectionDTO) {
+ // if connection does not exist, then the update request is likely creating it
+ // so we don't verify since it will fail
+ if (connectionDAO.hasConnection(connectionDTO.getId())) {
+ connectionDAO.verifyUpdate(connectionDTO);
+ } else {
+ connectionDAO.verifyCreate(connectionDTO.getParentGroupId(), connectionDTO);
+ }
+ }
+
+ @Override
+ public void verifyDeleteConnection(final String connectionId) {
+ connectionDAO.verifyDelete(connectionId);
+ }
+
+ @Override
+ public void verifyDeleteFunnel(final String funnelId) {
+ funnelDAO.verifyDelete(funnelId);
+ }
+
+ @Override
+ public void verifyUpdateInputPort(final PortDTO inputPortDTO) {
+ // if connection does not exist, then the update request is likely creating it
+ // so we don't verify since it will fail
+ if (inputPortDAO.hasPort(inputPortDTO.getId())) {
+ inputPortDAO.verifyUpdate(inputPortDTO);
+ }
+ }
+
+ @Override
+ public void verifyDeleteInputPort(final String inputPortId) {
+ inputPortDAO.verifyDelete(inputPortId);
+ }
+
+ @Override
+ public void verifyUpdateOutputPort(final PortDTO outputPortDTO) {
+ // if connection does not exist, then the update request is likely creating it
+ // so we don't verify since it will fail
+ if (outputPortDAO.hasPort(outputPortDTO.getId())) {
+ outputPortDAO.verifyUpdate(outputPortDTO);
+ }
+ }
+
+ @Override
+ public void verifyDeleteOutputPort(final String outputPortId) {
+ outputPortDAO.verifyDelete(outputPortId);
+ }
+
+ @Override
+ public void verifyCreateProcessor(ProcessorDTO processorDTO) {
+ processorDAO.verifyCreate(processorDTO);
+ }
+
+ @Override
+ public void verifyUpdateProcessor(final ProcessorDTO processorDTO) {
+ // if group does not exist, then the update request is likely creating it
+ // so we don't verify since it will fail
+ if (processorDAO.hasProcessor(processorDTO.getId())) {
+ processorDAO.verifyUpdate(processorDTO);
+ } else {
+ verifyCreateProcessor(processorDTO);
+ }
+ }
+
+ @Override
+ public void verifyDeleteProcessor(final String processorId) {
+ processorDAO.verifyDelete(processorId);
+ }
+
+ @Override
+ public void verifyScheduleComponents(final String groupId, final ScheduledState state, final Set<String> componentIds) {
+ processGroupDAO.verifyScheduleComponents(groupId, state, componentIds);
+ }
+
+ @Override
+ public void verifyEnableComponents(String processGroupId, ScheduledState state, Set<String> componentIds) {
+ processGroupDAO.verifyEnableComponents(processGroupId, state, componentIds);
+ }
+
+ @Override
+ public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Collection<String> serviceIds) {
+ processGroupDAO.verifyActivateControllerServices(state, serviceIds);
+ }
+
+ @Override
+ public void verifyDeleteProcessGroup(final String groupId) {
+ processGroupDAO.verifyDelete(groupId);
+ }
+
+ @Override
+ public void verifyUpdateRemoteProcessGroup(final RemoteProcessGroupDTO remoteProcessGroupDTO) {
+ // if remote group does not exist, then the update request is likely creating it
+ // so we don't verify since it will fail
+ if (remoteProcessGroupDAO.hasRemoteProcessGroup(remoteProcessGroupDTO.getId())) {
+ remoteProcessGroupDAO.verifyUpdate(remoteProcessGroupDTO);
+ }
+ }
+
+ @Override
+ public void verifyUpdateRemoteProcessGroupInputPort(final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
+ remoteProcessGroupDAO.verifyUpdateInputPort(remoteProcessGroupId, remoteProcessGroupPortDTO);
+ }
+
+ @Override
+ public void verifyUpdateRemoteProcessGroupOutputPort(final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
+ remoteProcessGroupDAO.verifyUpdateOutputPort(remoteProcessGroupId, remoteProcessGroupPortDTO);
+ }
+
+ @Override
+ public void verifyDeleteRemoteProcessGroup(final String remoteProcessGroupId) {
+ remoteProcessGroupDAO.verifyDelete(remoteProcessGroupId);
+ }
+
+ @Override
+ public void verifyCreateControllerService(ControllerServiceDTO controllerServiceDTO) {
+ controllerServiceDAO.verifyCreate(controllerServiceDTO);
+ }
+
+ @Override
+ public void verifyUpdateControllerService(final ControllerServiceDTO controllerServiceDTO) {
+ // if service does not exist, then the update request is likely creating it
+ // so we don't verify since it will fail
+ if (controllerServiceDAO.hasControllerService(controllerServiceDTO.getId())) {
+ controllerServiceDAO.verifyUpdate(controllerServiceDTO);
+ } else {
+ verifyCreateControllerService(controllerServiceDTO);
+ }
+ }
+
+ @Override
+ public void verifyUpdateControllerServiceReferencingComponents(final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) {
+ controllerServiceDAO.verifyUpdateReferencingComponents(controllerServiceId, scheduledState, controllerServiceState);
+ }
+
+ @Override
+ public void verifyDeleteControllerService(final String controllerServiceId) {
+ controllerServiceDAO.verifyDelete(controllerServiceId);
+ }
+
+ @Override
+ public void verifyCreateReportingTask(ReportingTaskDTO reportingTaskDTO) {
+ reportingTaskDAO.verifyCreate(reportingTaskDTO);
+ }
+
+ @Override
+ public void verifyUpdateReportingTask(final ReportingTaskDTO reportingTaskDTO) {
+ // if tasks does not exist, then the update request is likely creating it
+ // so we don't verify since it will fail
+ if (reportingTaskDAO.hasReportingTask(reportingTaskDTO.getId())) {
+ reportingTaskDAO.verifyUpdate(reportingTaskDTO);
+ } else {
+ verifyCreateReportingTask(reportingTaskDTO);
+ }
+ }
+
+ @Override
+ public void verifyDeleteReportingTask(final String reportingTaskId) {
+ reportingTaskDAO.verifyDelete(reportingTaskId);
+ }
+
+ // -----------------------------------------
+ // Write Operations
+ // -----------------------------------------
+
+ @Override
+ public AccessPolicyEntity updateAccessPolicy(final Revision revision, final AccessPolicyDTO accessPolicyDTO) {
+ final Authorizable authorizable = authorizableLookup.getAccessPolicyById(accessPolicyDTO.getId());
+ final RevisionUpdate<AccessPolicyDTO> snapshot = updateComponent(revision,
+ authorizable,
+ () -> accessPolicyDAO.updateAccessPolicy(accessPolicyDTO),
+ accessPolicy -> {
+ final Set<TenantEntity> users = accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet());
+ final Set<TenantEntity> userGroups = accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet());
+ final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource());
+ return dtoFactory.createAccessPolicyDto(accessPolicy, userGroups, users, componentReference);
+ });
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizable);
+ return entityFactory.createAccessPolicyEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
+ }
+
+ @Override
+ public UserEntity updateUser(final Revision revision, final UserDTO userDTO) {
+ final Authorizable usersAuthorizable = authorizableLookup.getTenant();
+ final Set<Group> groups = userGroupDAO.getUserGroupsForUser(userDTO.getId());
+ final Set<AccessPolicy> policies = userGroupDAO.getAccessPoliciesForUser(userDTO.getId());
+ final RevisionUpdate<UserDTO> snapshot = updateComponent(revision,
+ usersAuthorizable,
+ () -> userDAO.updateUser(userDTO),
+ user -> {
+ final Set<TenantEntity> tenantEntities = groups.stream().map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet());
+ final Set<AccessPolicySummaryEntity> policyEntities = policies.stream().map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
+ return dtoFactory.createUserDto(user, tenantEntities, policyEntities);
+ });
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(usersAuthorizable);
+ return entityFactory.createUserEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
+ }
+
+ @Override
+ public UserGroupEntity updateUserGroup(final Revision revision, final UserGroupDTO userGroupDTO) {
+ final Authorizable userGroupsAuthorizable = authorizableLookup.getTenant();
+ final Set<AccessPolicy> policies = userGroupDAO.getAccessPoliciesForUserGroup(userGroupDTO.getId());
+ final RevisionUpdate<UserGroupDTO> snapshot = updateComponent(revision,
+ userGroupsAuthorizable,
+ () -> userGroupDAO.updateUserGroup(userGroupDTO),
+ userGroup -> {
+ final Set<TenantEntity> tenantEntities = userGroup.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet());
+ final Set<AccessPolicySummaryEntity> policyEntities = policies.stream().map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
+ return dtoFactory.createUserGroupDto(userGroup, tenantEntities, policyEntities);
+ }
+ );
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(userGroupsAuthorizable);
+ return entityFactory.createUserGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
+ }
+
+ @Override
+ public ConnectionEntity updateConnection(final Revision revision, final ConnectionDTO connectionDTO) {
+ final Connection connectionNode = connectionDAO.getConnection(connectionDTO.getId());
+
+ final RevisionUpdate<ConnectionDTO> snapshot = updateComponent(
+ revision,
+ connectionNode,
+ () -> connectionDAO.updateConnection(connectionDTO),
+ connection -> dtoFactory.createConnectionDto(connection));
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connectionNode);
+ final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionNode.getIdentifier()));
+ return entityFactory.createConnectionEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status);
+ }
+
+ @Override
+ public ProcessorEntity updateProcessor(final Revision revision, final ProcessorDTO processorDTO) {
+ // get the component, ensure we have access to it, and perform the update request
+ final ProcessorNode processorNode = processorDAO.getProcessor(processorDTO.getId());
+ final RevisionUpdate<ProcessorDTO> snapshot = updateComponent(revision,
+ processorNode,
+ () -> processorDAO.updateProcessor(processorDTO),
+ proc -> {
+ awaitValidationCompletion(proc);
+ return dtoFactory.createProcessorDto(proc);
+ });
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processorNode);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processorNode));
+ final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processorNode.getIdentifier()));
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processorNode.getIdentifier()));
+ final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+ return entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
+ }
+
+ private void awaitValidationCompletion(final ComponentNode component) {
+ component.getValidationStatus(VALIDATION_WAIT_MILLIS, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public LabelEntity updateLabel(final Revision revision, final LabelDTO labelDTO) {
+ final Label labelNode = labelDAO.getLabel(labelDTO.getId());
+ final RevisionUpdate<LabelDTO> snapshot = updateComponent(revision,
+ labelNode,
+ () -> labelDAO.updateLabel(labelDTO),
+ label -> dtoFactory.createLabelDto(label));
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(labelNode);
+ return entityFactory.createLabelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
+ }
+
+ @Override
+ public FunnelEntity updateFunnel(final Revision revision, final FunnelDTO funnelDTO) {
+ final Funnel funnelNode = funnelDAO.getFunnel(funnelDTO.getId());
+ final RevisionUpdate<FunnelDTO> snapshot = updateComponent(revision,
+ funnelNode,
+ () -> funnelDAO.updateFunnel(funnelDTO),
+ funnel -> dtoFactory.createFunnelDto(funnel));
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnelNode);
+ return entityFactory.createFunnelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
+ }
+
+
+ /**
+ * Updates a component with the given revision, using the provided supplier to call
+ * into the appropriate DAO and the provided function to convert the component into a DTO.
+ *
+ * @param revision the current revision
+ * @param daoUpdate a Supplier that will update the component via the appropriate DAO
+ * @param dtoCreation a Function to convert a component into a dao
+ * @param <D> the DTO Type of the updated component
+ * @param <C> the Component Type of the updated component
+ * @return A RevisionUpdate that represents the new configuration
+ */
+ private <D, C> RevisionUpdate<D> updateComponent(final Revision revision, final Authorizable authorizable, final Supplier<C> daoUpdate, final Function<C, D> dtoCreation) {
+ try {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ final RevisionUpdate<D> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(revision), user, new UpdateRevisionTask<D>() {
+ @Override
+ public RevisionUpdate<D> update() {
+ // get the updated component
+ final C component = daoUpdate.get();
+
+ // save updated controller
+ controllerFacade.save();
+
+ final D dto = dtoCreation.apply(component);
+
+ final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId());
+ final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity());
+ return new StandardRevisionUpdate<>(dto, lastModification);
+ }
+ });
+
+ return updatedComponent;
+ } catch (final ExpiredRevisionClaimException erce) {
+ throw new InvalidRevisionException("Failed to update component " + authorizable, erce);
+ }
+ }
+
+
+ @Override
+ public void verifyUpdateSnippet(final SnippetDTO snippetDto, final Set<String> affectedComponentIds) {
+ // if snippet does not exist, then the update request is likely creating it
+ // so we don't verify since it will fail
+ if (snippetDAO.hasSnippet(snippetDto.getId())) {
+ snippetDAO.verifyUpdateSnippetComponent(snippetDto);
+ }
+ }
+
+ @Override
+ public SnippetEntity updateSnippet(final Set<Revision> revisions, final SnippetDTO snippetDto) {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ final RevisionClaim revisionClaim = new StandardRevisionClaim(revisions);
+
+ final RevisionUpdate<SnippetDTO> snapshot;
+ try {
+ snapshot = revisionManager.updateRevision(revisionClaim, user, new UpdateRevisionTask<SnippetDTO>() {
+ @Override
+ public RevisionUpdate<SnippetDTO> update() {
+ // get the updated component
+ final Snippet snippet = snippetDAO.updateSnippetComponents(snippetDto);
+
+ // drop the snippet
+ snippetDAO.dropSnippet(snippet.getId());
+
+ // save updated controller
+ controllerFacade.save();
+
+ // increment the revisions
+ final Set<Revision> updatedRevisions = revisions.stream().map(revision -> {
+ final Revision currentRevision = revisionManager.getRevision(revision.getComponentId());
+ return currentRevision.incrementRevision(revision.getClientId());
+ }).collect(Collectors.toSet());
+
+ final SnippetDTO dto = dtoFactory.createSnippetDto(snippet);
+ return new StandardRevisionUpdate<>(dto, null, updatedRevisions);
+ }
+ });
+ } catch (final ExpiredRevisionClaimException e) {
+ throw new InvalidRevisionException("Failed to update Snippet", e);
+ }
+
+ return entityFactory.createSnippetEntity(snapshot.getComponent());
+ }
+
+ @Override
+ public PortEntity updateInputPort(final Revision revision, final PortDTO inputPortDTO) {
+ final Port inputPortNode = inputPortDAO.getPort(inputPortDTO.getId());
+ final RevisionUpdate<PortDTO> snapshot = updateComponent(revision,
+ inputPortNode,
+ () -> inputPortDAO.updatePort(inputPortDTO),
+ port -> dtoFactory.createPortDto(port));
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(inputPortNode);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(inputPortNode));
+ final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(inputPortNode.getIdentifier()));
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(inputPortNode.getIdentifier()));
+ final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+ return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
+ }
+
+ @Override
+ public PortEntity updateOutputPort(final Revision revision, final PortDTO outputPortDTO) {
+ final Port outputPortNode = outputPortDAO.getPort(outputPortDTO.getId());
+ final RevisionUpdate<PortDTO> snapshot = updateComponent(revision,
+ outputPortNode,
+ () -> outputPortDAO.updatePort(outputPortDTO),
+ port -> dtoFactory.createPortDto(port));
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(outputPortNode);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(outputPortNode), NiFiUserUtils.getNiFiUser());
+ final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(outputPortNode.getIdentifier()));
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(outputPortNode.getIdentifier()));
+ final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+ return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
+ }
+
+ @Override
+ public RemoteProcessGroupEntity updateRemoteProcessGroup(final Revision revision, final RemoteProcessGroupDTO remoteProcessGroupDTO) {
+ final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId());
+ final RevisionUpdate<RemoteProcessGroupDTO> snapshot = updateComponent(
+ revision,
+ remoteProcessGroupNode,
+ () -> remoteProcessGroupDAO.updateRemoteProcessGroup(remoteProcessGroupDTO),
+ remoteProcessGroup -> dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode));
+ final RevisionDTO updateRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
+ final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroupNode,
+ controllerFacade.getRemoteProcessGroupStatus(remoteProcessGroupNode.getIdentifier()));
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(remoteProcessGroupNode.getIdentifier()));
+ final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+ return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), updateRevision, permissions, operatePermissions, status, bulletinEntities);
+ }
+
+ @Override
+ public RemoteProcessGroupPortEntity updateRemoteProcessGroupInputPort(
+ final Revision revision, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
+
+ final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupPortDTO.getGroupId());
+ final RevisionUpdate<RemoteProcessGroupPortDTO> snapshot = updateComponent(
+ revision,
+ remoteProcessGroupNode,
+ () -> remoteProcessGroupDAO.updateRemoteProcessGroupInputPort(remoteProcessGroupId, remoteProcessGroupPortDTO),
+ remoteGroupPort -> dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort));
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode));
+ final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
+ return entityFactory.createRemoteProcessGroupPortEntity(snapshot.getComponent(), updatedRevision, permissions, operatePermissions);
+ }
+
+ @Override
+ public RemoteProcessGroupPortEntity updateRemoteProcessGroupOutputPort(
+ final Revision revision, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
+
+ final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupPortDTO.getGroupId());
+ final RevisionUpdate<RemoteProcessGroupPortDTO> snapshot = updateComponent(
+ revision,
+ remoteProcessGroupNode,
+ () -> remoteProcessGroupDAO.updateRemoteProcessGroupOutputPort(remoteProcessGroupId, remoteProcessGroupPortDTO),
+ remoteGroupPort -> dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort));
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode));
+ final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
+ return entityFactory.createRemoteProcessGroupPortEntity(snapshot.getComponent(), updatedRevision, permissions, operatePermissions);
+ }
+
+ @Override
+ public Set<AffectedComponentDTO> getActiveComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO variableRegistryDto) {
+ final ProcessGroup group = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId());
+ if (group == null) {
+ throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistryDto.getProcessGroupId());
+ }
+
+ final Map<String, String> variableMap = new HashMap<>();
+ variableRegistryDto.getVariables().stream() // have to use forEach here instead of using Collectors.toMap because value may be null
+ .map(VariableEntity::getVariable)
+ .forEach(var -> variableMap.put(var.getName(), var.getValue()));
+
+ final Set<AffectedComponentDTO> affectedComponentDtos = new HashSet<>();
+
+ final Set<String> updatedVariableNames = getUpdatedVariables(group, variableMap);
+ for (final String variableName : updatedVariableNames) {
+ final Set<ComponentNode> affectedComponents = group.getComponentsAffectedByVariable(variableName);
+
+ for (final ComponentNode component : affectedComponents) {
+ if (component instanceof ProcessorNode) {
+ final ProcessorNode procNode = (ProcessorNode) component;
+ if (procNode.isRunning()) {
+ affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(procNode));
+ }
+ } else if (component instanceof ControllerServiceNode) {
+ final ControllerServiceNode serviceNode = (ControllerServiceNode) component;
+ if (serviceNode.isActive()) {
+ affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(serviceNode));
+ }
+ } else {
+ throw new RuntimeException("Found unexpected type of Component [" + component.getCanonicalClassName() + "] dependending on variable");
+ }
+ }
+ }
+
+ return affectedComponentDtos;
+ }
+
+ @Override
+ public Set<AffectedComponentEntity> getComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO variableRegistryDto) {
+ final ProcessGroup group = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId());
+ if (group == null) {
+ throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistryDto.getProcessGroupId());
+ }
+
+ final Map<String, String> variableMap = new HashMap<>();
+ variableRegistryDto.getVariables().stream() // have to use forEach here instead of using Collectors.toMap because value may be null
+ .map(VariableEntity::getVariable)
+ .forEach(var -> variableMap.put(var.getName(), var.getValue()));
+
+ final Set<AffectedComponentEntity> affectedComponentEntities = new HashSet<>();
+
+ final Set<String> updatedVariableNames = getUpdatedVariables(group, variableMap);
+ for (final String variableName : updatedVariableNames) {
+ final Set<ComponentNode> affectedComponents = group.getComponentsAffectedByVariable(variableName);
+ affectedComponentEntities.addAll(dtoFactory.createAffectedComponentEntities(affectedComponents, revisionManager));
+ }
+
+ return affectedComponentEntities;
+ }
+
+ private Set<String> getUpdatedVariables(final ProcessGroup group, final Map<String, String> newVariableValues) {
+ final Set<String> updatedVariableNames = new HashSet<>();
+
+ final ComponentVariableRegistry registry = group.getVariableRegistry();
+ for (final Map.Entry<String, String> entry : newVariableValues.entrySet()) {
+ final String varName = entry.getKey();
+ final String newValue = entry.getValue();
+
+ final String curValue = registry.getVariableValue(varName);
+ if (!Objects.equals(newValue, curValue)) {
+ updatedVariableNames.add(varName);
+ }
+ }
+
+ return updatedVariableNames;
+ }
+
+
+ @Override
+ public VariableRegistryEntity updateVariableRegistry(Revision revision, VariableRegistryDTO variableRegistryDto) {
+ final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId());
+ final RevisionUpdate<VariableRegistryDTO> snapshot = updateComponent(revision,
+ processGroupNode,
+ () -> processGroupDAO.updateVariableRegistry(variableRegistryDto),
+ processGroup -> dtoFactory.createVariableRegistryDto(processGroup, revisionManager));
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode);
+ final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
+ return entityFactory.createVariableRegistryEntity(snapshot.getComponent(), updatedRevision, permissions);
+ }
+
+
+ @Override
+ public ProcessGroupEntity updateProcessGroup(final Revision revision, final ProcessGroupDTO processGroupDTO) {
+ final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(processGroupDTO.getId());
+ final RevisionUpdate<ProcessGroupDTO> snapshot = updateComponent(revision,
+ processGroupNode,
+ () -> processGroupDAO.updateProcessGroup(processGroupDTO),
+ processGroup -> dtoFactory.createProcessGroupDto(processGroup));
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode);
+ final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
+ final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroupNode.getIdentifier()));
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroupNode.getIdentifier()));
+ final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+ return entityFactory.createProcessGroupEntity(snapshot.getComponent(), updatedRevision, permissions, status, bulletinEntities);
+ }
+
+ @Override
+ public void verifyUpdateProcessGroup(ProcessGroupDTO processGroupDTO) {
+ if (processGroupDAO.hasProcessGroup(processGroupDTO.getId())) {
+ processGroupDAO.verifyUpdate(processGroupDTO);
+ }
+ }
+
+ @Override
+ public ScheduleComponentsEntity enableComponents(String processGroupId, ScheduledState state, Map<String, Revision> componentRevisions) {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ final RevisionUpdate<ScheduleComponentsEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new
+ UpdateRevisionTask<ScheduleComponentsEntity>() {
+ @Override
+ public RevisionUpdate<ScheduleComponentsEntity> update() {
+ // schedule the components
+ processGroupDAO.enableComponents(processGroupId, state, componentRevisions.keySet());
+
+ // update the revisions
+ final Map<String, Revision> updatedRevisions = new HashMap<>();
+ for (final Revision revision : componentRevisions.values()) {
+ final Revision currentRevision = revisionManager.getRevision(revision.getComponentId());
+ updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId()));
+ }
+
+ // save
+ controllerFacade.save();
+
+ // gather details for response
+ final ScheduleComponentsEntity entity = new ScheduleComponentsEntity();
+ entity.setId(processGroupId);
+ entity.setState(state.name());
+ return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values()));
+ }
+ });
+
+ return updatedComponent.getComponent();
+ }
+
+ @Override
+ public ScheduleComponentsEntity scheduleComponents(final String processGroupId, final ScheduledState state, final Map<String, Revision> componentRevisions) {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ final RevisionUpdate<ScheduleComponentsEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new
+ UpdateRevisionTask<ScheduleComponentsEntity>() {
+ @Override
+ public RevisionUpdate<ScheduleComponentsEntity> update() {
+ // schedule the components
+ processGroupDAO.scheduleComponents(processGroupId, state, componentRevisions.keySet());
+
+ // update the revisions
+ final Map<String, Revision> updatedRevisions = new HashMap<>();
+ for (final Revision revision : componentRevisions.values()) {
+ final Revision currentRevision = revisionManager.getRevision(revision.getComponentId());
+ updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId()));
+ }
+
+ // save
+ controllerFacade.save();
+
+ // gather details for response
+ final ScheduleComponentsEntity entity = new ScheduleComponentsEntity();
+ entity.setId(processGroupId);
+ entity.setState(state.name());
+ return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values()));
+ }
+ });
+
+ return updatedComponent.getComponent();
+ }
+
+ @Override
+ public ActivateControllerServicesEntity activateControllerServices(final String processGroupId, final ControllerServiceState state, final Map<String, Revision> serviceRevisions) {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ final RevisionUpdate<ActivateControllerServicesEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(serviceRevisions.values()), user,
+ new UpdateRevisionTask<ActivateControllerServicesEntity>() {
+ @Override
+ public RevisionUpdate<ActivateControllerServicesEntity> update() {
+ // schedule the components
+ processGroupDAO.activateControllerServices(processGroupId, state, serviceRevisions.keySet());
+
+ // update the revisions
+ final Map<String, Revision> updatedRevisions = new HashMap<>();
+ for (final Revision revision : serviceRevisions.values()) {
+ final Revision currentRevision = revisionManager.getRevision(revision.getComponentId());
+ updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId()));
+ }
+
+ // save
+ controllerFacade.save();
+
+ // gather details for response
+ final ActivateControllerServicesEntity entity = new ActivateControllerServicesEntity();
+ entity.setId(processGroupId);
+ entity.setState(state.name());
+ return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values()));
+ }
+ });
+
+ return updatedComponent.getComponent();
+ }
+
+
+ @Override
+ public ControllerConfigurationEntity updateControllerConfiguration(final Revision revision, final ControllerConfigurationDTO controllerConfigurationDTO) {
+ final RevisionUpdate<ControllerConfigurationDTO> updatedComponent = updateComponent(
+ revision,
+ controllerFacade,
+ () -> {
+ if (controllerConfigurationDTO.getMaxTimerDrivenThreadCount() != null) {
+ controllerFacade.setMaxTimerDrivenThreadCount(controllerConfigurationDTO.getMaxTimerDrivenThreadCount());
+ }
+ if (controllerConfigurationDTO.getMaxEventDrivenThreadCount() != null) {
+ controllerFacade.setMaxEventDrivenThreadCount(controllerConfigurationDTO.getMaxEventDrivenThreadCount());
+ }
+
+ return controllerConfigurationDTO;
+ },
+ controller -> dtoFactory.createControllerConfigurationDto(controllerFacade));
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerFacade);
+ final RevisionDTO updateRevision = dtoFactory.createRevisionDTO(updatedComponent.getLastModification());
+ return entityFactory.createControllerConfigurationEntity(updatedComponent.getComponent(), updateRevision, permissions);
+ }
+
+
+ @Override
+ public NodeDTO updateNode(final NodeDTO nodeDTO) {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ if (user == null) {
+ throw new WebApplicationException(new Throwable("Unable to access details for current user."));
+ }
+ final String userDn = user.getIdentity();
+
+ final NodeIdentifier nodeId = clusterCoordinator.getNodeIdentifier(nodeDTO.getNodeId());
+ if (nodeId == null) {
+ throw new UnknownNodeException("No node exists with ID " + nodeDTO.getNodeId());
+ }
+
+
+ if (NodeConnectionState.CONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) {
+ clusterCoordinator.requestNodeConnect(nodeId, userDn);
+ } else if (NodeConnectionState.OFFLOADING.name().equalsIgnoreCase(nodeDTO.getStatus())) {
+ clusterCoordinator.requestNodeOffload(nodeId, OffloadCode.OFFLOADED,
+ "User " + userDn + " requested that node be offloaded");
+ } else if (NodeConnectionState.DISCONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) {
+ clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.USER_DISCONNECTED,
+ "User " + userDn + " requested that node be disconnected from cluster");
+ }
+
+ return getNode(nodeId);
+ }
+
+ @Override
+ public CounterDTO updateCounter(final String counterId) {
+ return dtoFactory.createCounterDto(controllerFacade.resetCounter(counterId));
+ }
+
+ @Override
+ public void verifyCanClearProcessorState(final String processorId) {
+ processorDAO.verifyClearState(processorId);
+ }
+
+ @Override
+ public void clearProcessorState(final String processorId) {
+ processorDAO.clearState(processorId);
+ }
+
+ @Override
+ public void verifyCanClearControllerServiceState(final String controllerServiceId) {
+ controllerServiceDAO.verifyClearState(controllerServiceId);
+ }
+
+ @Override
+ public void clearControllerServiceState(final String controllerServiceId) {
+ controllerServiceDAO.clearState(controllerServiceId);
+ }
+
+ @Override
+ public void verifyCanClearReportingTaskState(final String reportingTaskId) {
+ reportingTaskDAO.verifyClearState(reportingTaskId);
+ }
+
+ @Override
+ public void clearReportingTaskState(final String reportingTaskId) {
+ reportingTaskDAO.clearState(reportingTaskId);
+ }
+
+ @Override
+ public ConnectionEntity deleteConnection(final Revision revision, final String connectionId) {
+ final Connection connection = connectionDAO.getConnection(connectionId);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
+ final ConnectionDTO snapshot = deleteComponent(
+ revision,
+ connection.getResource(),
+ () -> connectionDAO.deleteConnection(connectionId),
+ false, // no policies to remove
+ dtoFactory.createConnectionDto(connection));
+
+ return entityFactory.createConnectionEntity(snapshot, null, permissions, null);
+ }
+
+ @Override
+ public DropRequestDTO deleteFlowFileDropRequest(final String connectionId, final String dropRequestId) {
+ return dtoFactory.createDropRequestDTO(connectionDAO.deleteFlowFileDropRequest(connectionId, dropRequestId));
+ }
+
+ @Override
+ public ListingRequestDTO deleteFlowFileListingRequest(final String connectionId, final String listingRequestId) {
+ final Connection connection = connectionDAO.getConnection(connectionId);
+ final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.deleteFlowFileListingRequest(connectionId, listingRequestId));
+
+ // include whether the source and destination are running
+ if (connection.getSource() != null) {
+ listRequest.setSourceRunning(connection.getSource().isRunning());
+ }
+ if (connection.getDestination() != null) {
+ listRequest.setDestinationRunning(connection.getDestination().isRunning());
+ }
+
+ return listRequest;
+ }
+
+ @Override
+ public ProcessorEntity deleteProcessor(final Revision revision, final String processorId) {
+ final ProcessorNode processor = processorDAO.getProcessor(processorId);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor));
+ final ProcessorDTO snapshot = deleteComponent(
+ revision,
+ processor.getResource(),
+ () -> processorDAO.deleteProcessor(processorId),
+ true,
+ dtoFactory.createProcessorDto(processor));
+
+ return entityFactory.createProcessorEntity(snapshot, null, permissions, operatePermissions, null, null);
+ }
+
+ @Override
+ public ProcessorEntity terminateProcessor(final String processorId) {
+ processorDAO.terminate(processorId);
+ return getProcessor(processorId);
+ }
+
+ @Override
+ public void verifyTerminateProcessor(final String processorId) {
+ processorDAO.verifyTerminate(processorId);
+ }
+
+ @Override
+ public LabelEntity deleteLabel(final Revision revision, final String labelId) {
+ final Label label = labelDAO.getLabel(labelId);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label);
+ final LabelDTO snapshot = deleteComponent(
+ revision,
+ label.getResource(),
+ () -> labelDAO.deleteLabel(labelId),
+ true,
+ dtoFactory.createLabelDto(label));
+
+ return entityFactory.createLabelEntity(snapshot, null, permissions);
+ }
+
+ @Override
+ public UserEntity deleteUser(final Revision revision, final String userId) {
+ final User user = userDAO.getUser(userId);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
+ final Set<TenantEntity> userGroups = user != null ? userGroupDAO.getUserGroupsForUser(userId).stream()
+ .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()) : null;
+ final Set<AccessPolicySummaryEntity> policyEntities = user != null ? userGroupDAO.getAccessPoliciesForUser(userId).stream()
+ .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()) : null;
+
+ final String resourceIdentifier = ResourceFactory.getTenantResource().getIdentifier() + "/" + userId;
+ final UserDTO snapshot = deleteComponent(
+ revision,
+ new Resource() {
+ @Override
+ public String getIdentifier() {
+ return resourceIdentifier;
+ }
+
+ @Override
+ public String getName() {
+ return resourceIdentifier;
+ }
+
+ @Override
+ public String getSafeDescription() {
+ return "User " + userId;
+ }
+ },
+ () -> userDAO.deleteUser(userId),
+ false, // no user specific policies to remove
+ dtoFactory.createUserDto(user, userGroups, policyEntities));
+
+ return entityFactory.createUserEntity(snapshot, null, permissions);
+ }
+
+ @Override
+ public UserGroupEntity deleteUserGroup(final Revision revision, final String userGroupId) {
+ final Group userGroup = userGroupDAO.getUserGroup(userGroupId);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
+ final Set<TenantEntity> users = userGroup != null ? userGroup.getUsers().stream()
+ .map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()) : null;
+ final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(userGroup.getIdentifier()).stream()
+ .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
+
+ final String resourceIdentifier = ResourceFactory.getTenantResource().getIdentifier() + "/" + userGroupId;
+ final UserGroupDTO snapshot = deleteComponent(
+ revision,
+ new Resource() {
+ @Override
+ public String getIdentifier() {
+ return resourceIdentifier;
+ }
+
+ @Override
+ public String getName() {
+ return resourceIdentifier;
+ }
+
+ @Override
+ public String getSafeDescription() {
+ return "User Group " + userGroupId;
+ }
+ },
+ () -> userGroupDAO.deleteUserGroup(userGroupId),
+ false, // no user group specific policies to remove
+ dtoFactory.createUserGroupDto(userGroup, users, policyEntities));
+
+ return entityFactory.createUserGroupEntity(snapshot, null, permissions);
+ }
+
+ @Override
+ public AccessPolicyEntity deleteAccessPolicy(final Revision revision, final String accessPolicyId) {
+ final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(accessPolicyId);
+ final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource());
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicyId));
+ final Set<TenantEntity> userGroups = accessPolicy != null ? accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()) : null;
+ final Set<TenantEntity> users = accessPolicy != null ? accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()) : null;
+ final AccessPolicyDTO snapshot = deleteComponent(
+ revision,
+ new Resource() {
+ @Override
+ public String getIdentifier() {
+ return accessPolicy.getResource();
+ }
+
+ @Override
+ public String getName() {
+ return accessPolicy.getResource();
+ }
+
+ @Override
+ public String getSafeDescription() {
+ return "Policy " + accessPolicyId;
+ }
+ },
+ () -> accessPolicyDAO.deleteAccessPolicy(accessPolicyId),
+ false, // no need to clean up any policies as it's already been removed above
+ dtoFactory.createAccessPolicyDto(accessPolicy, userGroups, users, componentReference));
+
+ return entityFactory.createAccessPolicyEntity(snapshot, null, permissions);
+ }
+
+ @Override
+ public FunnelEntity deleteFunnel(final Revision revision, final String funnelId) {
+ final Funnel funnel = funnelDAO.getFunnel(funnelId);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnel);
+ final FunnelDTO snapshot = deleteComponent(
+ revision,
+ funnel.getResource(),
+ () -> funnelDAO.deleteFunnel(funnelId),
+ true,
+ dtoFactory.createFunnelDto(funnel));
+
+ return entityFactory.createFunnelEntity(snapshot, null, permissions);
+ }
+
+ /**
+ * Deletes a component using the Optimistic Locking Manager
+ *
+ * @param revision the current revision
+ * @param resource the resource being removed
+ * @param deleteAction the action that deletes the component via the appropriate DAO object
+ * @param cleanUpPolicies whether or not the policies for this resource should be removed as well - not necessary when there are
+ * no component specific policies or if the policies of the component are inherited
+ * @return a dto that represents the new configuration
+ */
+ private <D, C> D deleteComponent(final Revision revision, final Resource resource, final Runnable deleteAction, final boolean cleanUpPolicies, final D dto) {
+ final RevisionClaim claim = new StandardRevisionClaim(revision);
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ return revisionManager.deleteRevision(claim, user, new DeleteRevisionTask<D>() {
+ @Override
+ public D performTask() {
+ logger.debug("Attempting to delete component {} with claim {}", resource.getIdentifier(), claim);
+
+ // run the delete action
+ deleteAction.run();
+
+ // save the flow
+ controllerFacade.save();
+ logger.debug("Deletion of component {} was successful", resource.getIdentifier());
+
+ if (cleanUpPolicies) {
+ cleanUpPolicies(resource);
+ }
+
+ return dto;
+ }
+ });
+ }
+
+ /**
+ * Clean up the policies for the specified component resource.
+ *
+ * @param componentResource the resource for the component
+ */
+ private void cleanUpPolicies(final Resource componentResource) {
+ // ensure the authorizer supports configuration
+ if (accessPolicyDAO.supportsConfigurableAuthorizer()) {
+ final List<Resource> resources = new ArrayList<>();
+ resources.add(componentResource);
+ resources.add(ResourceFactory.getDataResource(componentResource));
+ resources.add(ResourceFactory.getProvenanceDataResource(componentResource));
+ resources.add(ResourceFactory.getDataTransferResource(componentResource));
+ resources.add(ResourceFactory.getPolicyResource(componentResource));
+
+ for (final Resource resource : resources) {
+ for (final RequestAction action : RequestAction.values()) {
+ try {
+ // since the component is being deleted, also delete any relevant access policies
+ final AccessPolicy readPolicy = accessPolicyDAO.getAccessPolicy(action, resource.getIdentifier());
+ if (readPolicy != null) {
+ accessPolicyDAO.deleteAccessPolicy(readPolicy.getIdentifier());
+ }
+ } catch (final Exception e) {
+ logger.warn(String.format("Unable to remove access policy for %s %s after component removal.", action, resource.getIdentifier()), e);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void verifyDeleteSnippet(final String snippetId, final Set<String> affectedComponentIds) {
+ snippetDAO.verifyDeleteSnippetComponents(snippetId);
+ }
+
+ @Override
+ public SnippetEntity deleteSnippet(final Set<Revision> revisions, final String snippetId) {
+ final Snippet snippet = snippetDAO.getSnippet(snippetId);
+
+ // grab the resources in the snippet so we can delete the policies afterwards
+ final Set<Resource> snippetResources = new HashSet<>();
+ snippet.getProcessors().keySet().forEach(id -> snippetResources.add(processorDAO.getProcessor(id).getResource()));
+ snippet.getInputPorts().keySet().forEach(id -> snippetResources.add(inputPortDAO.getPort(id).getResource()));
+ snippet.getOutputPorts().keySet().forEach(id -> snippetResources.add(outputPortDAO.getPort(id).getResource()));
+ snippet.getFunnels().keySet().forEach(id -> snippetResources.add(funnelDAO.getFunnel(id).getResource()));
+ snippet.getLabels().keySet().forEach(id -> snippetResources.add(labelDAO.getLabel(id).getResource()));
+ snippet.getRemoteProcessGroups().keySet().forEach(id -> snippetResources.add(remoteProcessGroupDAO.getRemoteProcessGroup(id).getResource()));
+ snippet.getProcessGroups().keySet().forEach(id -> {
+ final ProcessGroup processGroup = processGroupDAO.getProcessGroup(id);
+
+ // add the process group
+ snippetResources.add(processGroup.getResource());
+
+ // add each encapsulated component
+ processGroup.findAllProcessors().forEach(processor -> snippetResources.add(processor.getResource()));
+ processGroup.findAllInputPorts().forEach(inputPort -> snippetResources.add(inputPort.getResource()));
+ processGroup.findAllOutputPorts().forEach(outputPort -> snippetResources.add(outputPort.getResource()));
+ processGroup.findAllFunnels().forEach(funnel -> snippetResources.add(funnel.getResource()));
+ processGroup.findAllLabels().forEach(label -> snippetResources.add(label.getResource()));
+ processGroup.findAllProcessGroups().forEach(childGroup -> snippetResources.add(childGroup.getResource()));
+ processGroup.findAllRemoteProcessGroups().forEach(remoteProcessGroup -> snippetResources.add(remoteProcessGroup.getResource()));
+ processGroup.findAllTemplates().forEach(template -> snippetResources.add(template.getResource()));
+ processGroup.findAllControllerServices().forEach(controllerService -> snippetResources.add(controllerService.getResource()));
+ });
+
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ final RevisionClaim claim = new StandardRevisionClaim(revisions);
+ final SnippetDTO dto = revisionManager.deleteRevision(claim, user, new DeleteRevisionTask<SnippetDTO>() {
+ @Override
+ public SnippetDTO performTask() {
+ // delete the components in the snippet
+ snippetDAO.deleteSnippetComponents(snippetId);
+
+ // drop the snippet
+ snippetDAO.dropSnippet(snippetId);
+
+ // save
+ controllerFacade.save();
+
+ // create the dto for the snippet that was just removed
+ return dtoFactory.createSnippetDto(snippet);
+ }
+ });
+
+ // clean up component policies
+ snippetResources.forEach(resource -> cleanUpPolicies(resource));
+
+ return entityFactory.createSnippetEntity(dto);
+ }
+
+ @Override
+ public PortEntity deleteInputPort(final Revision revision, final String inputPortId) {
+ final Port port = inputPortDAO.getPort(inputPortId);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port));
+ final PortDTO snapshot = deleteComponent(
+ revision,
+ port.getResource(),
+ () -> inputPortDAO.deletePort(inputPortId),
+ true,
+ dtoFactory.createPortDto(port));
+
+ return entityFactory.createPortEntity(snapshot, null, permissions, operatePermissions, null, null);
+ }
+
+ @Override
+ public PortEntity deleteOutputPort(final Revision revision, final String outputPortId) {
+ final Port port = outputPortDAO.getPort(outputPortId);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port));
+ final PortDTO snapshot = deleteComponent(
+ revision,
+ port.getResource(),
+ () -> outputPortDAO.deletePort(outputPortId),
+ true,
+ dtoFactory.createPortDto(port));
+
+ return entityFactory.createPortEntity(snapshot, null, permissions, operatePermissions, null, null);
+ }
+
+ @Override
+ public ProcessGroupEntity deleteProcessGroup(final Revision revision, final String groupId) {
+ final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
+
+ // grab the resources in the snippet so we can delete the policies afterwards
+ final Set<Resource> groupResources = new HashSet<>();
+ processGroup.findAllProcessors().forEach(processor -> groupResources.add(processor.getResource()));
+ processGroup.findAllInputPorts().forEach(inputPort -> groupResources.add(inputPort.getResource()));
+ processGroup.findAllOutputPorts().forEach(outputPort -> groupResources.add(outputPort.getResource()));
+ processGroup.findAllFunnels().forEach(funnel -> groupResources.add(funnel.getResource()));
+ processGroup.findAllLabels().forEach(label -> groupResources.add(label.getResource()));
+ processGroup.findAllProcessGroups().forEach(childGroup -> groupResources.add(childGroup.getResource()));
+ processGroup.findAllRemoteProcessGroups().forEach(remoteProcessGroup -> groupResources.add(remoteProcessGroup.getResource()));
+ processGroup.findAllTemplates().forEach(template -> groupResources.add(template.getResource()));
+ processGroup.findAllControllerServices().forEach(controllerService -> groupResources.add(controllerService.getResource()));
+
+ final ProcessGroupDTO snapshot = deleteComponent(
+ revision,
+ processGroup.getResource(),
+ () -> processGroupDAO.deleteProcessGroup(groupId),
+ true,
+ dtoFactory.createProcessGroupDto(processGroup));
+
+ // delete all applicable component policies
+ groupResources.forEach(groupResource -> cleanUpPolicies(groupResource));
+
+ return entityFactory.createProcessGroupEntity(snapshot, null, permissions, null, null);
+ }
+
+ @Override
+ public RemoteProcessGroupEntity deleteRemoteProcessGroup(final Revision revision, final String remoteProcessGroupId) {
+ final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroup));
+ final RemoteProcessGroupDTO snapshot = deleteComponent(
+ revision,
+ remoteProcessGroup.getResource(),
+ () -> remoteProcessGroupDAO.deleteRemoteProcessGroup(remoteProcessGroupId),
+ true,
+ dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));
+
+ return entityFactory.createRemoteProcessGroupEntity(snapshot, null, permissions, operatePermissions, null, null);
+ }
+
+ @Override
+ public void deleteTemplate(final String id) {
+ // delete the template and save the flow
+ templateDAO.deleteTemplate(id);
+ controllerFacade.save();
+ }
+
+ @Override
+ public ConnectionEntity createConnection(final Revision revision, final String groupId, final ConnectionDTO connectionDTO) {
+ final RevisionUpdate<ConnectionDTO> snapshot = createComponent(
+ revision,
+ connectionDTO,
+ () -> connectionDAO.createConnection(groupId, connectionDTO),
+ connection -> dtoFactory.createConnectionDto(connection));
+
+ final Connection connection = connectionDAO.getConnection(connectionDTO.getId());
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
+ final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionDTO.getId()));
+ return entityFactory.createConnectionEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status);
+ }
+
+ @Override
+ public DropRequestDTO createFlowFileDropRequest(final String connectionId, final String dropRequestId) {
+ return dtoFactory.createDropRequestDTO(connectionDAO.createFlowFileDropRequest(connectionId, dropRequestId));
+ }
+
+ @Override
+ public ListingRequestDTO createFlowFileListingRequest(final String connectionId, final String listingRequestId) {
+ final Connection connection = connectionDAO.getConnection(connectionId);
+ final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.createFlowFileListingRequest(connectionId, listingRequestId));
+
+ // include whether the source and destination are running
+ if (connection.getSource() != null) {
+ listRequest.setSourceRunning(connection.getSource().isRunning());
+ }
+ if (connection.getDestination() != null) {
+ listRequest.setDestinationRunning(connection.getDestination().isRunning());
+ }
+
+ return listRequest;
+ }
+
+ @Override
+ public ProcessorEntity createProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) {
+ final RevisionUpdate<ProcessorDTO> snapshot = createComponent(
+ revision,
+ processorDTO,
+ () -> processorDAO.createProcessor(groupId, processorDTO),
+ processor -> {
+ awaitValidationCompletion(processor);
+ return dtoFactory.createProcessorDto(processor);
+ });
+
+ final ProcessorNode processor = processorDAO.getProcessor(processorDTO.getId());
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor));
+ final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processorDTO.getId()));
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processorDTO.getId()));
+ final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+ return entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
+ }
+
+ @Override
+ public LabelEntity createLabel(final Revision revision, final String groupId, final LabelDTO labelDTO) {
+ final RevisionUpdate<LabelDTO> snapshot = createComponent(
+ revision,
+ labelDTO,
+ () -> labelDAO.createLabel(groupId, labelDTO),
+ label -> dtoFactory.createLabelDto(label));
+
+ final Label label = labelDAO.getLabel(labelDTO.getId());
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label);
+ return entityFactory.createLabelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
+ }
+
+ /**
+ * Creates a component using the optimistic locking manager.
+ *
+ * @param componentDto the DTO that will be used to create the component
+ * @param daoCreation A Supplier that will create the NiFi Component to use
+ * @param dtoCreation a Function that will convert the NiFi Component into a corresponding DTO
+ * @param <D> the DTO Type
+ * @param <C> the NiFi Component Type
+ * @return a RevisionUpdate that represents the updated configuration
+ */
+ private <D, C> RevisionUpdate<D> createComponent(final Revision revision, final ComponentDTO componentDto, final Supplier<C> daoCreation, final Function<C, D> dtoCreation) {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ // read lock on the containing group
+ // request claim for component to be created... revision already verified (version == 0)
+ final RevisionClaim claim = new StandardRevisionClaim(revision);
+
+ // update revision through revision manager
+ return revisionManager.updateRevision(claim, user, () -> {
+ // add the component
+ final C component = daoCreation.get();
+
+ // save the flow
+ controllerFacade.save();
+
+ final D dto = dtoCreation.apply(component);
+ final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
+ return new StandardRevisionUpdate<>(dto, lastMod);
+ });
+ }
+
+ @Override
+ public BulletinEntity createBulletin(final BulletinDTO bulletinDTO, final Boolean canRead){
+ final Bulletin bulletin = BulletinFactory.createBulletin(bulletinDTO.getCategory(),bulletinDTO.getLevel(),bulletinDTO.getMessage());
+ bulletinRepository.addBulletin(bulletin);
+ return entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin),canRead);
+ }
+
+ @Override
+ public FunnelEntity createFunnel(final Revision revision, final String groupId, final FunnelDTO funnelDTO) {
+ final RevisionUpdate<FunnelDTO> snapshot = createComponent(
+ revision,
+ funnelDTO,
+ () -> funnelDAO.createFunnel(groupId, funnelDTO),
+ funnel -> dtoFactory.createFunnelDto(funnel));
+
+ final Funnel funnel = funnelDAO.getFunnel(funnelDTO.getId());
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnel);
+ return entityFactory.createFunnelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
+ }
+
+ @Override
+ public AccessPolicyEntity createAccessPolicy(final Revision revision, final AccessPolicyDTO accessPolicyDTO) {
+ final Authorizable tenantAuthorizable = authorizableLookup.getTenant();
+ final String creator = NiFiUserUtils.getNiFiUserIdentity();
+
+ final AccessPolicy newAccessPolicy = accessPolicyDAO.createAccessPolicy(accessPolicyDTO);
+ final ComponentReferenceEntity componentReference = createComponentReferenceEntity(newAccessPolicy.getResource());
+ final AccessPolicyDTO newAccessPolicyDto = dtoFactory.createAccessPolicyDto(newAccessPolicy,
+ newAccessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()),
+ newAccessPolicy.getUsers().stream().map(userId -> {
+ final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userId));
+ return entityFactory.createTenantEntity(dtoFactory.createTenantDTO(userDAO.getUser(userId)), userRevision,
+ dtoFactory.createPermissionsDto(tenantAuthorizable));
+ }).collect(Collectors.toSet()), componentReference);
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicyDTO.getId()));
+ return entityFactory.createAccessPolicyEntity(newAccessPolicyDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions);
+ }
+
+ @Override
+ public UserEntity createUser(final Revision revision, final UserDTO userDTO) {
+ final String creator = NiFiUserUtils.getNiFiUserIdentity();
+ final User newUser = userDAO.createUser(userDTO);
+ final Set<TenantEntity> tenantEntities = userGroupDAO.getUserGroupsForUser(newUser.getIdentifier()).stream()
+ .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet());
+ final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUser(newUser.getIdentifier()).stream()
+ .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
+ final UserDTO newUserDto = dtoFactory.createUserDto(newUser, tenantEntities, policyEntities);
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
+ return entityFactory.createUserEntity(newUserDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions);
+ }
+
+ private ComponentReferenceEntity createComponentReferenceEntity(final String resource) {
+ ComponentReferenceEntity componentReferenceEntity = null;
+ try {
+ // get the component authorizable
+ Authorizable componentAuthorizable = authorizableLookup.getAuthorizableFromResource(resource);
+
+ // if this represents an authorizable whose policy permissions are enforced through the base resource,
+ // get the underlying base authorizable for the component reference
+ if (componentAuthorizable instanceof EnforcePolicyPermissionsThroughBaseResource) {
+ componentAuthorizable = ((EnforcePolicyPermissionsThroughBaseResource) componentAuthorizable).getBaseAuthorizable();
+ }
+
+ final ComponentReferenceDTO componentReference = dtoFactory.createComponentReferenceDto(componentAuthorizable);
+ if (componentReference != null) {
+ final PermissionsDTO componentReferencePermissions = dtoFactory.createPermissionsDto(componentAuthorizable);
+ final RevisionDTO componentReferenceRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(componentReference.getId()));
+ componentReferenceEntity = entityFactory.createComponentReferenceEntity(componentReference, componentReferenceRevision, componentReferencePermissions);
+ }
+ } catch (final ResourceNotFoundException e) {
+ // component not found for the specified resource
+ }
+
+ return componentReferenceEntity;
+ }
+
+ private AccessPolicySummaryEntity createAccessPolicySummaryEntity(final AccessPolicy ap) {
+ final ComponentReferenceEntity componentReference = createComponentReferenceEntity(ap.getResource());
+ final AccessPolicySummaryDTO apSummary = dtoFactory.createAccessPolicySummaryDto(ap, componentReference);
+ final PermissionsDTO apPermissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(ap.getIdentifier()));
+ final RevisionDTO apRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(ap.getIdentifier()));
+ return entityFactory.createAccessPolicySummaryEntity(apSummary, apRevision, apPermissions);
+ }
+
+ @Override
+ public UserGroupEntity createUserGroup(final Revision revision, final UserGroupDTO userGroupDTO) {
+ final String creator = NiFiUserUtils.getNiFiUserIdentity();
+ final Group newUserGroup = userGroupDAO.createUserGroup(userGroupDTO);
+ final Set<TenantEntity> tenantEntities = newUserGroup.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet());
+ final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(newUserGroup.getIdentifier()).stream()
+ .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
+ final UserGroupDTO newUserGroupDto = dtoFactory.createUserGroupDto(newUserGroup, tenantEntities, policyEntities);
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
+ return entityFactory.createUserGroupEntity(newUserGroupDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions);
+ }
+
+ private void validateSnippetContents(final FlowSnippetDTO flow) {
+ // validate any processors
+ if (flow.getProcessors() != null) {
+ for (final ProcessorDTO processorDTO : flow.getProcessors()) {
+ final ProcessorNode processorNode = processorDAO.getProcessor(processorDTO.getId());
+ processorDTO.setValidationStatus(processorNode.getValidationStatus().name());
+
+ final Collection<ValidationResult> validationErrors = processorNode.getValidationErrors();
+ if (validationErrors != null && !validationErrors.isEmpty()) {
+ final List<String> errors = new ArrayList<>();
+ for (final ValidationResult validationResult : validationErrors) {
+ errors.add(validationResult.toString());
+ }
+ processorDTO.setValidationErrors(errors);
+ }
+ }
+ }
+
+ if (flow.getInputPorts() != null) {
+ for (final PortDTO portDTO : flow.getInputPorts()) {
+ final Port port = inputPortDAO.getPort(portDTO.getId());
+ final Collection<ValidationResult> validationErrors = port.getValidationErrors();
+ if (validationErrors != null && !validationErrors.isEmpty()) {
+ final List<String> errors = new ArrayList<>();
+ for (final ValidationResult validationResult : validationErrors) {
+ errors.add(validationResult.toString());
+ }
+ portDTO.setValidationErrors(errors);
+ }
+ }
+ }
+
+ if (flow.getOutputPorts() != null) {
+ for (final PortDTO portDTO : flow.getOutputPorts()) {
+ final Port port = outputPortDAO.getPort(portDTO.getId());
+ final Collection<ValidationResult> validationErrors = port.getValidationErrors();
+ if (validationErrors != null && !validationErrors.isEmpty()) {
+ final List<String> errors = new ArrayList<>();
+ for (final ValidationResult validationResult : validationErrors) {
+ errors.add(validationResult.toString());
+ }
+ portDTO.setValidationErrors(errors);
+ }
+ }
+ }
+
+ // get any remote process group issues
+ if (flow.getRemoteProcessGroups() != null) {
+ for (final RemoteProcessGroupDTO remoteProcessGroupDTO : flow.getRemoteProcessGroups()) {
+ final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId());
+
+ if (remoteProcessGroup.getAuthorizationIssue() != null) {
+ remoteProcessGroupDTO.setAuthorizationIssues(Arrays.asList(remoteProcessGroup.getAuthorizationIssue()));
+ }
+ }
+ }
+ }
+
+ @Override
+ public FlowEntity copySnippet(final String groupId, final String snippetId, final Double originX, final Double originY, final String idGenerationSeed) {
+ // create the new snippet
+ final FlowSnippetDTO snippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY, idGenerationSeed);
+
+ // save the flow
+ controllerFacade.save();
+
+ // drop the snippet
+ snippetDAO.dropSnippet(snippetId);
+
+ // post process new flow snippet
+ final FlowDTO flowDto = postProcessNewFlowSnippet(groupId, snippet);
+
+ final FlowEntity flowEntity = new FlowEntity();
+ flowEntity.setFlow(flowDto);
+ return flowEntity;
+ }
+
+ @Override
+ public SnippetEntity createSnippet(final SnippetDTO snippetDTO) {
+ // add the component
+ final Snippet snippet = snippetDAO.createSnippet(snippetDTO);
+
+ // save the flow
+ controllerFacade.save();
+
+ final SnippetDTO dto = dtoFactory.createSnippetDto(snippet);
+ final RevisionUpdate<SnippetDTO> snapshot = new StandardRevisionUpdate<>(dto, null);
+
+ return entityFactory.createSnippetEntity(snapshot.getComponent());
+ }
+
+ @Override
+ public PortEntity createInputPort(final Revision revision, final String groupId, final PortDTO inputPortDTO) {
+ final RevisionUpdate<PortDTO> snapshot = createComponent(
+ revision,
+ inputPortDTO,
+ () -> inputPortDAO.createPort(groupId, inputPortDTO),
+ port -> dtoFactory.createPortDto(port));
+
+ final Port port = inputPortDAO.getPort(inputPortDTO.getId());
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port));
+ final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(port.getIdentifier()));
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier()));
+ final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+ return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
+ }
+
+ @Override
+ public PortEntity createOutputPort(final Revision revision, final String groupId, final PortDTO outputPortDTO) {
+ final RevisionUpdate<PortDTO> snapshot = createComponent(
+ revision,
+ outputPortDTO,
+ () -> outputPortDAO.createPort(groupId, outputPortDTO),
+ port -> dtoFactory.createPortDto(port));
+
+ final Port port = outputPortDAO.getPort(outputPortDTO.getId());
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port));
+ final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(port.getIdentifier()));
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier()));
+ final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+ return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
+ }
+
+ @Override
+ public ProcessGroupEntity createProcessGroup(final Revision revision, final String parentGroupId, final ProcessGroupDTO processGroupDTO) {
+ final RevisionUpdate<ProcessGroupDTO> snapshot = createComponent(
+ revision,
+ processGroupDTO,
+ () -> processGroupDAO.createProcessGroup(parentGroupId, processGroupDTO),
+ processGroup -> dtoFactory.createProcessGroupDto(processGroup));
+
+ final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupDTO.getId());
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
+ final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroup.getIdentifier()));
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroup.getIdentifier()));
+ final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+ return entityFactory.createProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status, bulletinEntities);
+ }
+
+ @Override
+ public RemoteProcessGroupEntity createRemoteProcessGroup(final Revision revision, final String groupId, final RemoteProcessGroupDTO remoteProcessGroupDTO) {
+ final RevisionUpdate<RemoteProcessGroupDTO> snapshot = createComponent(
+ revision,
+ remoteProcessGroupDTO,
+ () -> remoteProcessGroupDAO.createRemoteProcessGroup(groupId, remoteProcessGroupDTO),
+ remoteProcessGroup -> dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));
+
+ final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId());
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroup));
+ final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroup, controllerFacade.getRemoteProcessGroupStatus(remoteProcessGroup.getIdentifier()));
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(remoteProcessGroup.getIdentifier()));
+ final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+ return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()),
+ permissions, operatePermissions, status, bulletinEntities);
+ }
+
+ @Override
+ public boolean isRemoteGroupPortConnected(final String remoteProcessGroupId, final String remotePortId) {
+ final RemoteProcessGroup rpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
+ RemoteGroupPort port = rpg.getInputPort(remotePortId);
+ if (port != null) {
+ return port.hasIncomingConnection();
+ }
+
+ port = rpg.getOutputPort(remotePortId);
+ if (port != null) {
+ return !port.getConnections().isEmpty();
+ }
+
+ throw new ResourceNotFoundException("Could not find Port with ID " + remotePortId + " as a child of RemoteProcessGroup with ID " + remoteProcessGroupId);
+ }
+
+ @Override
+ public void verifyCanAddTemplate(String groupId, String name) {
+ templateDAO.verifyCanAddTemplate(name, groupId);
+ }
+
+ @Override
+ public void verifyComponentTypes(FlowSnippetDTO snippet) {
+ templateDAO.verifyComponentTypes(snippet);
+ }
+
+ @Override
+ public void verifyComponentTypes(final VersionedProcessGroup versionedGroup) {
+ controllerFacade.verifyComponentTypes(versionedGroup);
+ }
+
+ @Override
+ public void verifyImportProcessGroup(final VersionControlInformationDTO versionControlInfo, final VersionedProcessGroup contents, final String groupId) {
+ final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
+ verifyImportProcessGroup(versionControlInfo, contents, group);
+ }
+
+ private void verifyImportProcessGroup(final VersionControlInformationDTO vciDto, final VersionedProcessGroup contents, final ProcessGroup group) {
+ if (group == null) {
+ return;
+ }
+
+ final VersionControlInformation vci = group.getVersionControlInformation();
+ if (vci != null) {
+ // Note that we do not compare the Registry ID here because there could be two registry clients
+ // that point to the same server (one could point to localhost while another points to 127.0.0.1, for instance)..
+ if (Objects.equals(vciDto.getBucketId(), vci.getBucketIdentifier())
+ && Objects.equals(vciDto.getFlowId(), vci.getFlowIdentifier())) {
+
+ throw new IllegalStateException("Cannot import the specified Versioned Flow into the Process Group because doing so would cause a recursive dataflow. "
+ + "If Process Group A contains Process Group B, then Process Group B is not allowed to contain the flow identified by Process Group A.");
+ }
+ }
+
+ final Set<VersionedProcessGroup> childGroups = contents.getProcessGroups();
+ if (childGroups != null) {
+ for (final VersionedProcessGroup childGroup : childGroups) {
+ final VersionedFlowCoordinates childCoordinates = childGroup.getVersionedFlowCoordinates();
+ if (childCoordinates != null) {
+ final VersionControlInformationDTO childVci = new VersionControlInformationDTO();
+ childVci.setBucketId(childCoordinates.getBucketId());
+ childVci.setFlowId(childCoordinates.getFlowId());
+ verifyImportProcessGroup(childVci, childGroup, group);
+ }
+ }
+ }
+
+ verifyImportProcessGroup(vciDto, contents, group.getParent());
+ }
+
+ @Override
+ public TemplateDTO createTemplate(final String name, final String description, final String snippetId, final String groupId, final Optional<String> idGenerationSeed) {
+ // get the specified snippet
+ final Snippet snippet = snippetDAO.getSnippet(snippetId);
+
+ // create the template
+ final TemplateDTO templateDTO = new TemplateDTO();
+ templateDTO.setName(name);
+ templateDTO.setDescription(description);
+ templateDTO.setTimestamp(new Date());
+ templateDTO.setSnippet(snippetUtils.populateFlowSnippet(snippet, true, true, true));
+ templateDTO.setEncodingVersion(TemplateDTO.MAX_ENCODING_VERSION);
+
+ // set the id based on the specified seed
+ final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString();
+ templateDTO.setId(uuid);
+
+ // create the template
+ final Template template = templateDAO.createTemplate(templateDTO, groupId);
+
+ // drop the snippet
+ snippetDAO.dropSnippet(snippetId);
+
+ // save the flow
+ controllerFacade.save();
+
+ return dtoFactory.createTemplateDTO(template);
+ }
+
+ /**
+ * Ensures default values are populated for all components in this snippet. This is necessary to handle old templates without default values
+ * and when existing properties have default values introduced.
+ *
+ * @param snippet snippet
+ */
+ private void ensureDefaultPropertyValuesArePopulated(final FlowSnippetDTO snippet) {
+ if (snippet != null) {
+ if (snippet.getControllerServices() != null) {
+ snippet.getControllerServices().forEach(dto -> {
+ if (dto.getProperties() == null) {
+ dto.setProperties(new LinkedHashMap<>());
+ }
+
+ try {
+ final ConfigurableComponent configurableComponent = controllerFacade.getTemporaryComponent(dto.getType(), dto.getBundle());
+ configurableComponent.getPropertyDescriptors().forEach(descriptor -> {
+ if (dto.getProperties().get(descriptor.getName()) == null) {
+ dto.getProperties().put(descriptor.getName(), descriptor.getDefaultValue());
+ }
+ });
+ } catch (final Exception e) {
+ logger.warn(String.format("Unable to create ControllerService of type %s to populate default values.", dto.getType()));
+ }
+ });
+ }
+
+ if (snippet.getProcessors() != null) {
+ snippet.getProcessors().forEach(dto -> {
+ if (dto.getConfig() == null) {
+ dto.setConfig(new ProcessorConfigDTO());
+ }
+
+ final ProcessorConfigDTO config = dto.getConfig();
+ if (config.getProperties() == null) {
+ config.setProperties(new LinkedHashMap<>());
+ }
+
+ try {
+ final ConfigurableComponent configurableComponent = controllerFacade.getTemporaryComponent(dto.getType(), dto.getBundle());
+ configurableComponent.getPropertyDescriptors().forEach(descriptor -> {
+ if (config.getProperties().get(descriptor.getName()) == null) {
+ config.getProperties().put(descriptor.getName(), descriptor.getDefaultValue());
+ }
+ });
+ } catch (final Exception e) {
+ logger.warn(String.format("Unable to create Processor of type %s to populate default values.", dto.getType()));
+ }
+ });
+ }
+
+ if (snippet.getProcessGroups() != null) {
+ snippet.getProcessGroups().forEach(processGroup -> {
+ ensureDefaultPropertyValuesArePopulated(processGroup.getContents());
+ });
+ }
+ }
+ }
+
+ @Override
+ public TemplateDTO importTemplate(final TemplateDTO templateDTO, final String groupId, final Optional<String> idGenerationSeed) {
+ // ensure id is set
+ final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString();
+ templateDTO.setId(uuid);
+
+ // mark the timestamp
+ templateDTO.setTimestamp(new Date());
+
+ // ensure default values are populated
+ ensureDefaultPropertyValuesArePopulated(templateDTO.getSnippet());
+
+ // import the template
+ final Template template = templateDAO.importTemplate(templateDTO, groupId);
+
+ // save the flow
+ controllerFacade.save();
+
+ // return the template dto
+ return dtoFactory.createTemplateDTO(template);
+ }
+
+ /**
+ * Post processes a new flow snippet including validation, removing the snippet, and DTO conversion.
+ *
+ * @param groupId group id
+ * @param snippet snippet
+ * @return flow dto
+ */
+ private FlowDTO postProcessNewFlowSnippet(final String groupId, final FlowSnippetDTO snippet) {
+ // validate the new snippet
+ validateSnippetContents(snippet);
+
+ // identify all components added
+ final Set<String> identifiers = new HashSet<>();
+ snippet.getProcessors().stream()
+ .map(proc -> proc.getId())
+ .forEach(id -> identifiers.add(id));
+ snippet.getConnections().stream()
+ .map(conn -> conn.getId())
+ .forEach(id -> identifiers.add(id));
+ snippet.getInputPorts().stream()
+ .map(port -> port.getId())
+ .forEach(id -> identifiers.add(id));
+ snippet.getOutputPorts().stream()
+ .map(port -> port.getId())
+ .forEach(id -> identifiers.add(id));
+ snippet.getProcessGroups().stream()
+ .map(group -> group.getId())
+ .forEach(id -> identifiers.add(id));
+ snippet.getRemoteProcessGroups().stream()
+ .map(remoteGroup -> remoteGroup.getId())
+ .forEach(id -> identifiers.add(id));
+ snippet.getRemoteProcessGroups().stream()
+ .filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getInputPorts() != null)
+ .flatMap(remoteGroup -> remoteGroup.getContents().getInputPorts().stream())
+ .map(remoteInputPort -> remoteInputPort.getId())
+ .forEach(id -> identifiers.add(id));
+ snippet.getRemoteProcessGroups().stream()
+ .filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getOutputPorts() != null)
+ .flatMap(remoteGroup -> remoteGroup.getContents().getOutputPorts().stream())
+ .map(remoteOutputPort -> remoteOutputPort.getId())
+ .forEach(id -> identifiers.add(id));
+ snippet.getLabels().stream()
+ .map(label -> label.getId())
+ .forEach(id -> identifiers.add(id));
+
+ final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
+ final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId);
+ return dtoFactory.createFlowDto(group, groupStatus, snippet, revisionManager, this::getProcessGroupBulletins);
+ }
+
+ @Override
+ public FlowEntity createTemplateInstance(final String groupId, final Double originX, final Double originY, final String templateEncodingVersion,
+ final FlowSnippetDTO requestSnippet, final String idGenerationSeed) {
+
+ // instantiate the template - there is no need to make another copy of the flow snippet since the actual template
+ // was copied and this dto is only used to instantiate it's components (which as already completed)
+ final FlowSnippetDTO snippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateEncodingVersion, requestSnippet, idGenerationSeed);
+
+ // save the flow
+ controllerFacade.save();
+
+ // post process the new flow snippet
+ final FlowDTO flowDto = postProcessNewFlowSnippet(groupId, snippet);
+
+ final FlowEntity flowEntity = new FlowEntity();
+ flowEntity.setFlow(flowDto);
+ return flowEntity;
+ }
+
+ @Override
+ public ControllerServiceEntity createControllerService(final Revision revision, final String groupId, final ControllerServiceDTO controllerServiceDTO) {
+ controllerServiceDTO.setParentGroupId(groupId);
+
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ // request claim for component to be created... revision already verified (version == 0)
+ final RevisionClaim claim = new StandardRevisionClaim(revision);
+
+ final RevisionUpdate<ControllerServiceDTO> snapshot;
+ if (groupId == null) {
+ // update revision through revision manager
+ snapshot = revisionManager.updateRevision(claim, user, () -> {
+ // Unfortunately, we can not use the createComponent() method here because createComponent() wants to obtain the read lock
+ // on the group. The Controller Service may or may not have a Process Group (it won't if it's controller-scoped).
+ final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO);
+ controllerFacade.save();
+
+ awaitValidationCompletion(controllerService);
+ final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(controllerService);
+
+ final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
+ return new StandardRevisionUpdate<>(dto, lastMod);
+ });
+ } else {
+ snapshot = revisionManager.updateRevision(claim, user, () -> {
+ final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO);
+ controllerFacade.save();
+
+ awaitValidationCompletion(controllerService);
+ final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(controllerService);
+
+ final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
+ return new StandardRevisionUpdate<>(dto, lastMod);
+ });
+ }
+
+ final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceDTO.getId());
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService));
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(controllerServiceDTO.getId()));
+ final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+ return entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities);
+ }
+
+ @Override
+ public ControllerServiceEntity updateControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) {
+ // get the component, ensure we have access to it, and perform the update request
+ final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceDTO.getId());
+ final RevisionUpdate<ControllerServiceDTO> snapshot = updateComponent(revision,
+ controllerService,
+ () -> controllerServiceDAO.updateControllerService(controllerServiceDTO),
+ cs -> {
+ awaitValidationCompletion(cs);
+ final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(cs);
+ final ControllerServiceReference ref = controllerService.getReferences();
+ final ControllerServiceReferencingComponentsEntity referencingComponentsEntity =
+ createControllerServiceReferencingComponentsEntity(ref, Sets.newHashSet(controllerService.getIdentifier()));
+ dto.setReferencingComponents(referencingComponentsEntity.getControllerServiceReferencingComponents());
+ return dto;
+ });
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService));
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(controllerServiceDTO.getId()));
+ final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+ return entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities);
+ }
+
+
+ @Override
+ public ControllerServiceReferencingComponentsEntity updateControllerServiceReferencingComponents(
+ final Map<String, Revision> referenceRevisions, final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) {
+
+ final RevisionClaim claim = new StandardRevisionClaim(referenceRevisions.values());
+
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ final RevisionUpdate<ControllerServiceReferencingComponentsEntity> update = revisionManager.updateRevision(claim, user,
+ new UpdateRevisionTask<ControllerServiceReferencingComponentsEntity>() {
+ @Override
+ public RevisionUpdate<ControllerServiceReferencingComponentsEntity> update() {
+ final Set<ComponentNode> updated = controllerServiceDAO.updateControllerServiceReferencingComponents(controllerServiceId, scheduledState, controllerServiceState);
+ final ControllerServiceReference updatedReference = controllerServiceDAO.getControllerService(controllerServiceId).getReferences();
+
+ // get the revisions of the updated components
+ final Map<String, Revision> updatedRevisions = new HashMap<>();
+ for (final ComponentNode component : updated) {
+ final Revision currentRevision = revisionManager.getRevision(component.getIdentifier());
+ final Revision requestRevision = referenceRevisions.get(component.getIdentifier());
+ updatedRevisions.put(component.getIdentifier(), currentRevision.incrementRevision(requestRevision.getClientId()));
+ }
+
+ // ensure the revision for all referencing components is included regardless of whether they were updated in this request
+ for (final ComponentNode component : updatedReference.findRecursiveReferences(ComponentNode.class)) {
+ updatedRevisions.putIfAbsent(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier()));
+ }
+
+ final ControllerServiceReferencingComponentsEntity entity = createControllerServiceReferencingComponentsEntity(updatedReference, updatedRevisions);
+ return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values()));
+ }
+ });
+
+ return update.getComponent();
+ }
+
+ /**
+ * Finds the identifiers for all components referencing a ControllerService.
+ *
+ * @param reference ControllerServiceReference
+ * @param visited ControllerServices we've already visited
+ */
+ private void findControllerServiceReferencingComponentIdentifiers(final ControllerServiceReference reference, final Set<ControllerServiceNode> visited) {
+ for (final ComponentNode component : reference.getReferencingComponents()) {
+
+ // if this is a ControllerService consider it's referencing components
+ if (component instanceof ControllerServiceNode) {
+ final ControllerServiceNode node = (ControllerServiceNode) component;
+ if (!visited.contains(node)) {
+ visited.add(node);
+ findControllerServiceReferencingComponentIdentifiers(node.getReferences(), visited);
+ }
+ }
+ }
+ }
+
+ /**
+ * Creates entities for components referencing a ControllerService using their current revision.
+ *
+ * @param reference ControllerServiceReference
+ * @return The entity
+ */
+ private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity(final ControllerServiceReference reference, final Set<String> lockedIds) {
+ final Set<ControllerServiceNode> visited = new HashSet<>();
+ visited.add(reference.getReferencedComponent());
+ findControllerServiceReferencingComponentIdentifiers(reference, visited);
+
+ final Map<String, Revision> referencingRevisions = new HashMap<>();
+ for (final ComponentNode component : reference.getReferencingComponents()) {
+ referencingRevisions.put(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier()));
+ }
+
+ return createControllerServiceReferencingComponentsEntity(reference, referencingRevisions);
+ }
+
+ /**
+ * Creates entities for components referencing a ControllerService using the specified revisions.
+ *
+ * @param reference ControllerServiceReference
+ * @param revisions The revisions
+ * @return The entity
+ */
+ private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity(
+ final ControllerServiceReference reference, final Map<String, Revision> revisions) {
+ final Set<ControllerServiceNode> visited = new HashSet<>();
+ visited.add(reference.getReferencedComponent());
+ return createControllerServiceReferencingComponentsEntity(reference, revisions, visited);
+ }
+
+ /**
+ * Creates entities for components referencing a ControllerServcie using the specified revisions.
+ *
+ * @param reference ControllerServiceReference
+ * @param revisions The revisions
+ * @param visited Which services we've already considered (in case of cycle)
+ * @return The entity
+ */
+ private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity(
+ final ControllerServiceReference reference, final Map<String, Revision> revisions, final Set<ControllerServiceNode> visited) {
+
+ final String modifier = NiFiUserUtils.getNiFiUserIdentity();
+ final Set<ComponentNode> referencingComponents = reference.getReferencingComponents();
+
+ final Set<ControllerServiceReferencingComponentEntity> componentEntities = new HashSet<>();
+ for (final ComponentNode refComponent : referencingComponents) {
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(refComponent);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(refComponent));
+
+ final Revision revision = revisions.get(refComponent.getIdentifier());
+ final FlowModification flowMod = new FlowModification(revision, modifier);
+ final RevisionDTO revisionDto = dtoFactory.createRevisionDTO(flowMod);
+ final ControllerServiceReferencingComponentDTO dto = dtoFactory.createControllerServiceReferencingComponentDTO(refComponent);
+
+ if (refComponent instanceof ControllerServiceNode) {
+ final ControllerServiceNode node = (ControllerServiceNode) refComponent;
+
+ // indicate if we've hit a cycle
+ dto.setReferenceCycle(visited.contains(node));
+
+ // mark node as visited before building the reference cycle
+ visited.add(node);
+
+ // if we haven't encountered this service before include it's referencing components
+ if (!dto.getReferenceCycle()) {
+ final ControllerServiceReference refReferences = node.getReferences();
+ final Map<String, Revision> referencingRevisions = new HashMap<>(revisions);
+ for (final ComponentNode component : refReferences.getReferencingComponents()) {
+ referencingRevisions.putIfAbsent(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier()));
+ }
+ final ControllerServiceReferencingComponentsEntity references = createControllerServiceReferencingComponentsEntity(refReferences, referencingRevisions, visited);
+ dto.setReferencingComponents(references.getControllerServiceReferencingComponents());
+ }
+ }
+
+ componentEntities.add(entityFactory.createControllerServiceReferencingComponentEntity(refComponent.getIdentifier(), dto, revisionDto, permissions, operatePermissions));
+ }
+
+ final ControllerServiceReferencingComponentsEntity entity = new ControllerServiceReferencingComponentsEntity();
+ entity.setControllerServiceReferencingComponents(componentEntities);
+ return entity;
+ }
+
+ @Override
+ public ControllerServiceEntity deleteControllerService(final Revision revision, final String controllerServiceId) {
+ final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService));
+ final ControllerServiceDTO snapshot = deleteComponent(
+ revision,
+ controllerService.getResource(),
+ () -> controllerServiceDAO.deleteControllerService(controllerServiceId),
+ true,
+ dtoFactory.createControllerServiceDto(controllerService));
+
+ return entityFactory.createControllerServiceEntity(snapshot, null, permissions, operatePermissions, null);
+ }
+
+
+ @Override
+ public RegistryClientEntity createRegistryClient(Revision revision, RegistryDTO registryDTO) {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ // request claim for component to be created... revision already verified (version == 0)
+ final RevisionClaim claim = new StandardRevisionClaim(revision);
+
+ // update revision through revision manager
+ final RevisionUpdate<FlowRegistry> revisionUpdate = revisionManager.updateRevision(claim, user, () -> {
+ // add the component
+ final FlowRegistry registry = registryDAO.createFlowRegistry(registryDTO);
+
+ // save the flow
+ controllerFacade.save();
+
+ final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
+ return new StandardRevisionUpdate<>(registry, lastMod);
+ });
+
+ final FlowRegistry registry = revisionUpdate.getComponent();
+ return createRegistryClientEntity(registry);
+ }
+
+ @Override
+ public RegistryClientEntity getRegistryClient(final String registryId) {
+ final FlowRegistry registry = registryDAO.getFlowRegistry(registryId);
+ return createRegistryClientEntity(registry);
+ }
+
+ private RegistryClientEntity createRegistryClientEntity(final FlowRegistry flowRegistry) {
+ if (flowRegistry == null) {
+ return null;
+ }
+
+ final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(flowRegistry.getIdentifier()));
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getController());
+ final RegistryDTO dto = dtoFactory.createRegistryDto(flowRegistry);
+
+ return entityFactory.createRegistryClientEntity(dto, revision, permissions);
+ }
+
+ private VersionedFlowEntity createVersionedFlowEntity(final String registryId, final VersionedFlow versionedFlow) {
+ if (versionedFlow == null) {
+ return null;
+ }
+
+ final VersionedFlowDTO dto = new VersionedFlowDTO();
+ dto.setRegistryId(registryId);
+ dto.setBucketId(versionedFlow.getBucketIdentifier());
+ dto.setFlowId(versionedFlow.getIdentifier());
+ dto.setFlowName(versionedFlow.getName());
+ dto.setDescription(versionedFlow.getDescription());
+
+ final VersionedFlowEntity entity = new VersionedFlowEntity();
+ entity.setVersionedFlow(dto);
+
+ return entity;
+ }
+
+ private VersionedFlowSnapshotMetadataEntity createVersionedFlowSnapshotMetadataEntity(final String registryId, final VersionedFlowSnapshotMetadata metadata) {
+ if (metadata == null) {
+ return null;
+ }
+
+ final VersionedFlowSnapshotMetadataEntity entity = new VersionedFlowSnapshotMetadataEntity();
+ entity.setRegistryId(registryId);
+ entity.setVersionedFlowMetadata(metadata);
+
+ return entity;
+ }
+
+ @Override
+ public Set<RegistryClientEntity> getRegistryClients() {
+ return registryDAO.getFlowRegistries().stream()
+ .map(this::createRegistryClientEntity)
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public Set<RegistryEntity> getRegistriesForUser(final NiFiUser user) {
+ return registryDAO.getFlowRegistriesForUser(user).stream()
+ .map(flowRegistry -> entityFactory.createRegistryEntity(dtoFactory.createRegistryDto(flowRegistry)))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public Set<BucketEntity> getBucketsForUser(final String registryId, final NiFiUser user) {
+ return registryDAO.getBucketsForUser(registryId, user).stream()
+ .map(bucket -> {
+ if (bucket == null) {
+ return null;
+ }
+
+ final BucketDTO dto = new BucketDTO();
+ dto.setId(bucket.getIdentifier());
+ dto.setName(bucket.getName());
+ dto.setDescription(bucket.getDescription());
+ dto.setCreated(bucket.getCreatedTimestamp());
+
+ final Permissions regPermissions = bucket.getPermissions();
+ final PermissionsDTO permissions = new PermissionsDTO();
+ permissions.setCanRead(regPermissions.getCanRead());
+ permissions.setCanWrite(regPermissions.getCanWrite());
+
+ return entityFactory.createBucketEntity(dto, permissions);
+ })
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public Set<VersionedFlowEntity> getFlowsForUser(String registryId, String bucketId, NiFiUser user) {
+ return registryDAO.getFlowsForUser(registryId, bucketId, user).stream()
+ .map(vf -> createVersionedFlowEntity(registryId, vf))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public Set<VersionedFlowSnapshotMetadataEntity> getFlowVersionsForUser(String registryId, String bucketId, String flowId, NiFiUser user) {
+ return registryDAO.getFlowVersionsForUser(registryId, bucketId, flowId, user).stream()
+ .map(md -> createVersionedFlowSnapshotMetadataEntity(registryId, md))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public RegistryClientEntity updateRegistryClient(Revision revision, RegistryDTO registryDTO) {
+ final RevisionClaim revisionClaim = new StandardRevisionClaim(revision);
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ final FlowRegistry registry = registryDAO.getFlowRegistry(registryDTO.getId());
+ final RevisionUpdate<FlowRegistry> revisionUpdate = revisionManager.updateRevision(revisionClaim, user, () -> {
+ final boolean duplicateName = registryDAO.getFlowRegistries().stream()
+ .anyMatch(reg -> reg.getName().equals(registryDTO.getName()) && !reg.getIdentifier().equals(registryDTO.getId()));
+
+ if (duplicateName) {
+ throw new IllegalStateException("Cannot update Flow Registry because a Flow Registry already exists with the name " + registryDTO.getName());
+ }
+
+ registry.setDescription(registryDTO.getDescription());
+ registry.setName(registryDTO.getName());
+ registry.setURL(registryDTO.getUri());
+
+ controllerFacade.save();
+
+ final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId());
+ final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity());
+
+ return new StandardRevisionUpdate<>(registry, lastModification);
+ });
+
+ final FlowRegistry updatedReg = revisionUpdate.getComponent();
+ return createRegistryClientEntity(updatedReg);
+ }
+
+ @Override
+ public void verifyDeleteRegistry(String registryId) {
+ processGroupDAO.verifyDeleteFlowRegistry(registryId);
+ }
+
+ @Override
+ public RegistryClientEntity deleteRegistryClient(final Revision revision, final String registryId) {
+ final RevisionClaim claim = new StandardRevisionClaim(revision);
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ final FlowRegistry registry = revisionManager.deleteRevision(claim, user, () -> {
+ final FlowRegistry reg = registryDAO.removeFlowRegistry(registryId);
+ controllerFacade.save();
+ return reg;
+ });
+
+ return createRegistryClientEntity(registry);
+ }
+
+ @Override
+ public ReportingTaskEntity createReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ // request claim for component to be created... revision already verified (version == 0)
+ final RevisionClaim claim = new StandardRevisionClaim(revision);
+
+ // update revision through revision manager
+ final RevisionUpdate<ReportingTaskDTO> snapshot = revisionManager.updateRevision(claim, user, () -> {
+ // create the reporting task
+ final ReportingTaskNode reportingTask = reportingTaskDAO.createReportingTask(reportingTaskDTO);
+
+ // save the update
+ controllerFacade.save();
+ awaitValidationCompletion(reportingTask);
+
+ final ReportingTaskDTO dto = dtoFactory.createReportingTaskDto(reportingTask);
+ final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
+ return new StandardRevisionUpdate<>(dto, lastMod);
+ });
+
+ final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskDTO.getId());
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask));
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier()));
+ final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+ return entityFactory.createReportingTaskEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities);
+ }
+
+ @Override
+ public ReportingTaskEntity updateReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) {
+ // get the component, ensure we have access to it, and perform the update request
+ final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskDTO.getId());
+ final RevisionUpdate<ReportingTaskDTO> snapshot = updateComponent(revision,
+ reportingTask,
+ () -> reportingTaskDAO.updateReportingTask(reportingTaskDTO),
+ rt -> {
+ awaitValidationCompletion(rt);
+ return dtoFactory.createReportingTaskDto(rt);
+ });
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask));
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier()));
+ final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+ return entityFactory.createReportingTaskEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities);
+ }
+
+ @Override
+ public ReportingTaskEntity deleteReportingTask(final Revision revision, final String reportingTaskId) {
+ final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask));
+ final ReportingTaskDTO snapshot = deleteComponent(
+ revision,
+ reportingTask.getResource(),
+ () -> reportingTaskDAO.deleteReportingTask(reportingTaskId),
+ true,
+ dtoFactory.createReportingTaskDto(reportingTask));
+
+ return entityFactory.createReportingTaskEntity(snapshot, null, permissions, operatePermissions, null);
+ }
+
+ @Override
+ public void deleteActions(final Date endDate) {
+ // get the user from the request
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ if (user == null) {
+ throw new WebApplicationException(new Throwable("Unable to access details for current user."));
+ }
+
+ // create the purge details
+ final FlowChangePurgeDetails details = new FlowChangePurgeDetails();
+ details.setEndDate(endDate);
+
+ // create a purge action to record that records are being removed
+ final FlowChangeAction purgeAction = new FlowChangeAction();
+ purgeAction.setUserIdentity(user.getIdentity());
+ purgeAction.setOperation(Operation.Purge);
+ purgeAction.setTimestamp(new Date());
+ purgeAction.setSourceId("Flow Controller");
+ purgeAction.setSourceName("History");
+ purgeAction.setSourceType(Component.Controller);
+ purgeAction.setActionDetails(details);
+
+ // purge corresponding actions
+ auditService.purgeActions(endDate, purgeAction);
+ }
+
+ @Override
+ public ProvenanceDTO submitProvenance(final ProvenanceDTO query) {
+ return controllerFacade.submitProvenance(query);
+ }
+
+ @Override
+ public void deleteProvenance(final String queryId) {
+ controllerFacade.deleteProvenanceQuery(queryId);
+ }
+
+ @Override
+ public LineageDTO submitLineage(final LineageDTO lineage) {
+ return controllerFacade.submitLineage(lineage);
+ }
+
+ @Override
+ public void deleteLineage(final String lineageId) {
+ controllerFacade.deleteLineage(lineageId);
+ }
+
+ @Override
+ public ProvenanceEventDTO submitReplay(final Long eventId) {
+ return controllerFacade.submitReplay(eventId);
+ }
+
+ // -----------------------------------------
+ // Read Operations
+ // -----------------------------------------
+
+ @Override
+ public SearchResultsDTO searchController(final String query) {
+ return controllerFacade.search(query);
+ }
+
+ @Override
+ public DownloadableContent getContent(final String connectionId, final String flowFileUuid, final String uri) {
+ return connectionDAO.getContent(connectionId, flowFileUuid, uri);
+ }
+
+ @Override
+ public DownloadableContent getContent(final Long eventId, final String uri, final ContentDirection contentDirection) {
+ return controllerFacade.getContent(eventId, uri, contentDirection);
+ }
+
+ @Override
+ public ProvenanceDTO getProvenance(final String queryId, final Boolean summarize, final Boolean incrementalResults) {
+ return controllerFacade.getProvenanceQuery(queryId, summarize, incrementalResults);
+ }
+
+ @Override
+ public LineageDTO getLineage(final String lineageId) {
+ return controllerFacade.getLineage(lineageId);
+ }
+
+ @Override
+ public ProvenanceOptionsDTO getProvenanceSearchOptions() {
+ return controllerFacade.getProvenanceSearchOptions();
+ }
+
+ @Override
+ public ProvenanceEventDTO getProvenanceEvent(final Long id) {
+ return controllerFacade.getProvenanceEvent(id);
+ }
+
+ @Override
+ public ProcessGroupStatusEntity getProcessGroupStatus(final String groupId, final boolean recursive) {
+ final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
+ final ProcessGroupStatusDTO dto = dtoFactory.createProcessGroupStatusDto(processGroup, controllerFacade.getProcessGroupStatus(groupId));
+
+ // prune the response as necessary
+ if (!recursive) {
+ pruneChildGroups(dto.getAggregateSnapshot());
+ if (dto.getNodeSnapshots() != null) {
+ for (final NodeProcessGroupStatusSnapshotDTO nodeSnapshot : dto.getNodeSnapshots()) {
+ pruneChildGroups(nodeSnapshot.getStatusSnapshot());
+ }
+ }
+ }
+
+ return entityFactory.createProcessGroupStatusEntity(dto, permissions);
+ }
+
+ private void pruneChildGroups(final ProcessGroupStatusSnapshotDTO snapshot) {
+ for (final ProcessGroupStatusSnapshotEntity childProcessGroupStatusEntity : snapshot.getProcessGroupStatusSnapshots()) {
+ final ProcessGroupStatusSnapshotDTO childProcessGroupStatus = childProcessGroupStatusEntity.getProcessGroupStatusSnapshot();
+ childProcessGroupStatus.setConnectionStatusSnapshots(null);
+ childProcessGroupStatus.setProcessGroupStatusSnapshots(null);
+ childProcessGroupStatus.setInputPortStatusSnapshots(null);
+ childProcessGroupStatus.setOutputPortStatusSnapshots(null);
+ childProcessGroupStatus.setProcessorStatusSnapshots(null);
+ childProcessGroupStatus.setRemoteProcessGroupStatusSnapshots(null);
+ }
+ }
+
+ @Override
+ public ControllerStatusDTO getControllerStatus() {
+ return controllerFacade.getControllerStatus();
+ }
+
+ @Override
+ public ComponentStateDTO getProcessorState(final String processorId) {
+ final StateMap clusterState = isClustered() ? processorDAO.getState(processorId, Scope.CLUSTER) : null;
+ final StateMap localState = processorDAO.getState(processorId, Scope.LOCAL);
+
+ // processor will be non null as it was already found when getting the state
+ final ProcessorNode processor = processorDAO.getProcessor(processorId);
+ return dtoFactory.createComponentStateDTO(processorId, processor.getProcessor().getClass(), localState, clusterState);
+ }
+
+ @Override
+ public ComponentStateDTO getControllerServiceState(final String controllerServiceId) {
+ final StateMap clusterState = isClustered() ? controllerServiceDAO.getState(controllerServiceId, Scope.CLUSTER) : null;
+ final StateMap localState = controllerServiceDAO.getState(controllerServiceId, Scope.LOCAL);
+
+ // controller service will be non null as it was already found when getting the state
+ final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId);
+ return dtoFactory.createComponentStateDTO(controllerServiceId, controllerService.getControllerServiceImplementation().getClass(), localState, clusterState);
+ }
+
+ @Override
+ public ComponentStateDTO getReportingTaskState(final String reportingTaskId) {
+ final StateMap clusterState = isClustered() ? reportingTaskDAO.getState(reportingTaskId, Scope.CLUSTER) : null;
+ final StateMap localState = reportingTaskDAO.getState(reportingTaskId, Scope.LOCAL);
+
+ // reporting task will be non null as it was already found when getting the state
+ final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId);
+ return dtoFactory.createComponentStateDTO(reportingTaskId, reportingTask.getReportingTask().getClass(), localState, clusterState);
+ }
+
+ @Override
+ public CountersDTO getCounters() {
+ final List<Counter> counters = controllerFacade.getCounters();
+ final Set<CounterDTO> counterDTOs = new LinkedHashSet<>(counters.size());
+ for (final Counter counter : counters) {
+ counterDTOs.add(dtoFactory.createCounterDto(counter));
+ }
+
+ final CountersSnapshotDTO snapshotDto = dtoFactory.createCountersDto(counterDTOs);
+ final CountersDTO countersDto = new CountersDTO();
+ countersDto.setAggregateSnapshot(snapshotDto);
+
+ return countersDto;
+ }
+
+ private ConnectionEntity createConnectionEntity(final Connection connection) {
+ final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(connection.getIdentifier()));
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
+ final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connection.getIdentifier()));
+ return entityFactory.createConnectionEntity(dtoFactory.createConnectionDto(connection), revision, permissions, status);
+ }
+
+ @Override
+ public Set<ConnectionEntity> getConnections(final String groupId) {
+ final Set<Connection> connections = connectionDAO.getConnections(groupId);
+ return connections.stream()
+ .map(connection -> createConnectionEntity(connection))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public ConnectionEntity getConnection(final String connectionId) {
+ final Connection connection = connectionDAO.getConnection(connectionId);
+ return createConnectionEntity(connection);
+ }
+
+ @Override
+ public DropRequestDTO getFlowFileDropRequest(final String connectionId, final String dropRequestId) {
+ return dtoFactory.createDropRequestDTO(connectionDAO.getFlowFileDropRequest(connectionId, dropRequestId));
+ }
+
+ @Override
+ public ListingRequestDTO getFlowFileListingRequest(final String connectionId, final String listingRequestId) {
+ final Connection connection = connectionDAO.getConnection(connectionId);
+ final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.getFlowFileListingRequest(connectionId, listingRequestId));
+
+ // include whether the source and destination are running
+ if (connection.getSource() != null) {
+ listRequest.setSourceRunning(connection.getSource().isRunning());
+ }
+ if (connection.getDestination() != null) {
+ listRequest.setDestinationRunning(connection.getDestination().isRunning());
+ }
+
+ return listRequest;
+ }
+
+ @Override
+ public FlowFileDTO getFlowFile(final String connectionId, final String flowFileUuid) {
+ return dtoFactory.createFlowFileDTO(connectionDAO.getFlowFile(connectionId, flowFileUuid));
+ }
+
+ @Override
+ public ConnectionStatusEntity getConnectionStatus(final String connectionId) {
+ final Connection connection = connectionDAO.getConnection(connectionId);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
+ final ConnectionStatusDTO dto = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionId));
+ return entityFactory.createConnectionStatusEntity(dto, permissions);
+ }
+
+ @Override
+ public StatusHistoryEntity getConnectionStatusHistory(final String connectionId) {
+ final Connection connection = connectionDAO.getConnection(connectionId);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
+ final StatusHistoryDTO dto = controllerFacade.getConnectionStatusHistory(connectionId);
+ return entityFactory.createStatusHistoryEntity(dto, permissions);
+ }
+
+ private ProcessorEntity createProcessorEntity(final ProcessorNode processor, final NiFiUser user) {
+ final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processor.getIdentifier()));
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor, user);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor));
+ final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier()));
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processor.getIdentifier()));
+ final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+ return entityFactory.createProcessorEntity(dtoFactory.createProcessorDto(processor), revision, permissions, operatePermissions, status, bulletinEntities);
+ }
+
+ @Override
+ public Set<ProcessorEntity> getProcessors(final String groupId, final boolean includeDescendants) {
+ final Set<ProcessorNode> processors = processorDAO.getProcessors(groupId, includeDescendants);
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ return processors.stream()
+ .map(processor -> createProcessorEntity(processor, user))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public TemplateDTO exportTemplate(final String id) {
+ final Template template = templateDAO.getTemplate(id);
+ final TemplateDTO templateDetails = template.getDetails();
+
+ final TemplateDTO templateDTO = dtoFactory.createTemplateDTO(template);
+ templateDTO.setSnippet(dtoFactory.copySnippetContents(templateDetails.getSnippet()));
+ return templateDTO;
+ }
+
+ @Override
+ public TemplateDTO getTemplate(final String id) {
+ return dtoFactory.createTemplateDTO(templateDAO.getTemplate(id));
+ }
+
+ @Override
+ public Set<TemplateEntity> getTemplates() {
+ return templateDAO.getTemplates().stream()
+ .map(template -> {
+ final TemplateDTO dto = dtoFactory.createTemplateDTO(template);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(template);
+
+ final TemplateEntity entity = new TemplateEntity();
+ entity.setId(dto.getId());
+ entity.setPermissions(permissions);
+ entity.setTemplate(dto);
+ return entity;
+ }).collect(Collectors.toSet());
+ }
+
+ @Override
+ public Set<DocumentedTypeDTO> getWorkQueuePrioritizerTypes() {
+ return controllerFacade.getFlowFileComparatorTypes();
+ }
+
+ @Override
+ public Set<DocumentedTypeDTO> getProcessorTypes(final String bundleGroup, final String bundleArtifact, final String type) {
+ return controllerFacade.getFlowFileProcessorTypes(bundleGroup, bundleArtifact, type);
+ }
+
+ @Override
+ public Set<DocumentedTypeDTO> getControllerServiceTypes(final String serviceType, final String serviceBundleGroup, final String serviceBundleArtifact, final String serviceBundleVersion,
+ final String bundleGroup, final String bundleArtifact, final String type) {
+ return controllerFacade.getControllerServiceTypes(serviceType, serviceBundleGroup, serviceBundleArtifact, serviceBundleVersion, bundleGroup, bundleArtifact, type);
+ }
+
+ @Override
+ public Set<DocumentedTypeDTO> getReportingTaskTypes(final String bundleGroup, final String bundleArtifact, final String type) {
+ return controllerFacade.getReportingTaskTypes(bundleGroup, bundleArtifact, type);
+ }
+
+ @Override
+ public ProcessorEntity getProcessor(final String id) {
+ final ProcessorNode processor = processorDAO.getProcessor(id);
+ return createProcessorEntity(processor, NiFiUserUtils.getNiFiUser());
+ }
+
+ @Override
+ public PropertyDescriptorDTO getProcessorPropertyDescriptor(final String id, final String property) {
+ final ProcessorNode processor = processorDAO.getProcessor(id);
+ PropertyDescriptor descriptor = processor.getPropertyDescriptor(property);
+
+ // return an invalid descriptor if the processor doesn't support this property
+ if (descriptor == null) {
+ descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
+ }
+
+ return dtoFactory.createPropertyDescriptorDto(descriptor, processor.getProcessGroup().getIdentifier());
+ }
+
+ @Override
+ public ProcessorStatusEntity getProcessorStatus(final String id) {
+ final ProcessorNode processor = processorDAO.getProcessor(id);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor);
+ final ProcessorStatusDTO dto = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(id));
+ return entityFactory.createProcessorStatusEntity(dto, permissions);
+ }
+
+ @Override
+ public StatusHistoryEntity getProcessorStatusHistory(final String id) {
+ final ProcessorNode processor = processorDAO.getProcessor(id);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor);
+ final StatusHistoryDTO dto = controllerFacade.getProcessorStatusHistory(id);
+ return entityFactory.createStatusHistoryEntity(dto, permissions);
+ }
+
+ private boolean authorizeBulletin(final Bulletin bulletin) {
+ final String sourceId = bulletin.getSourceId();
+ final ComponentType type = bulletin.getSourceType();
+
+ final Authorizable authorizable;
+ try {
+ switch (type) {
+ case PROCESSOR:
+ authorizable = authorizableLookup.getProcessor(sourceId).getAuthorizable();
+ break;
+ case REPORTING_TASK:
+ authorizable = authorizableLookup.getReportingTask(sourceId).getAuthorizable();
+ break;
+ case CONTROLLER_SERVICE:
+ authorizable = authorizableLookup.getControllerService(sourceId).getAuthorizable();
+ break;
+ case FLOW_CONTROLLER:
+ authorizable = controllerFacade;
+ break;
+ case INPUT_PORT:
+ authorizable = authorizableLookup.getInputPort(sourceId);
+ break;
+ case OUTPUT_PORT:
+ authorizable = authorizableLookup.getOutputPort(sourceId);
+ break;
+ case REMOTE_PROCESS_GROUP:
+ authorizable = authorizableLookup.getRemoteProcessGroup(sourceId);
+ break;
+ default:
+ throw new WebApplicationException(Response.serverError().entity("An unexpected type of component is the source of this bulletin.").build());
+ }
+ } catch (final ResourceNotFoundException e) {
+ // if the underlying component is gone, disallow
+ return false;
+ }
+
+ // perform the authorization
+ final AuthorizationResult result = authorizable.checkAuthorization(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+ return Result.Approved.equals(result.getResult());
+ }
+
+ @Override
+ public BulletinBoardDTO getBulletinBoard(final BulletinQueryDTO query) {
+ // build the query
+ final BulletinQuery.Builder queryBuilder = new BulletinQuery.Builder()
+ .groupIdMatches(query.getGroupId())
+ .sourceIdMatches(query.getSourceId())
+ .nameMatches(query.getName())
+ .messageMatches(query.getMessage())
+ .after(query.getAfter())
+ .limit(query.getLimit());
+
+ // perform the query
+ final List<Bulletin> results = bulletinRepository.findBulletins(queryBuilder.build());
+
+ // perform the query and generate the results - iterating in reverse order since we are
+ // getting the most recent results by ordering by timestamp desc above. this gets the
+ // exact results we want but in reverse order
+ final List<BulletinEntity> bulletinEntities = new ArrayList<>();
+ for (final ListIterator<Bulletin> bulletinIter = results.listIterator(results.size()); bulletinIter.hasPrevious(); ) {
+ final Bulletin bulletin = bulletinIter.previous();
+ bulletinEntities.add(entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), authorizeBulletin(bulletin)));
+ }
+
+ // create the bulletin board
+ final BulletinBoardDTO bulletinBoard = new BulletinBoardDTO();
+ bulletinBoard.setBulletins(bulletinEntities);
+ bulletinBoard.setGenerated(new Date());
+ return bulletinBoard;
+ }
+
+ @Override
+ public SystemDiagnosticsDTO getSystemDiagnostics() {
+ final SystemDiagnostics sysDiagnostics = controllerFacade.getSystemDiagnostics();
+ return dtoFactory.createSystemDiagnosticsDto(sysDiagnostics);
+ }
+
+ @Override
+ public List<ResourceDTO> getResources() {
+ final List<Resource> resources = controllerFacade.getResources();
+ final List<ResourceDTO> resourceDtos = new ArrayList<>(resources.size());
+ for (final Resource resource : resources) {
+ resourceDtos.add(dtoFactory.createResourceDto(resource));
+ }
+ return resourceDtos;
+ }
+
+ @Override
+ public void discoverCompatibleBundles(VersionedProcessGroup versionedGroup) {
+ BundleUtils.discoverCompatibleBundles(controllerFacade.getExtensionManager(), versionedGroup);
+ }
+
+ @Override
+ public BundleCoordinate getCompatibleBundle(String type, BundleDTO bundleDTO) {
+ return BundleUtils.getCompatibleBundle(controllerFacade.getExtensionManager(), type, bundleDTO);
+ }
+
+ @Override
+ public ConfigurableComponent getTempComponent(String classType, BundleCoordinate bundleCoordinate) {
+ return controllerFacade.getExtensionManager().getTempComponent(classType, bundleCoordinate);
+ }
+
+ /**
+ * Ensures the specified user has permission to access the specified port. This method does
+ * not utilize the DataTransferAuthorizable as that will enforce the entire chain is
+ * authorized for the transfer. This method is only invoked when obtaining the site to site
+ * details so the entire chain isn't necessary.
+ */
+ private boolean isUserAuthorized(final NiFiUser user, final RootGroupPort port) {
+ final boolean isSiteToSiteSecure = Boolean.TRUE.equals(properties.isSiteToSiteSecure());
+
+ // if site to site is not secure, allow all users
+ if (!isSiteToSiteSecure) {
+ return true;
+ }
+
+ final Map<String, String> userContext;
+ if (user.getClientAddress() != null && !user.getClientAddress().trim().isEmpty()) {
+ userContext = new HashMap<>();
+ userContext.put(UserContextKeys.CLIENT_ADDRESS.name(), user.getClientAddress());
+ } else {
+ userContext = null;
+ }
+
+ final AuthorizationRequest request = new AuthorizationRequest.Builder()
+ .resource(ResourceFactory.getDataTransferResource(port.getResource()))
+ .identity(user.getIdentity())
+ .groups(user.getGroups())
+ .anonymous(user.isAnonymous())
+ .accessAttempt(false)
+ .action(RequestAction.WRITE)
+ .userContext(userContext)
+ .explanationSupplier(() -> "Unable to retrieve port details.")
+ .build();
+
+ final AuthorizationResult result = authorizer.authorize(request);
+ return Result.Approved.equals(result.getResult());
+ }
+
+ @Override
+ public ControllerDTO getSiteToSiteDetails() {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ if (user == null) {
+ throw new WebApplicationException(new Throwable("Unable to access details for current user."));
+ }
+
+ // serialize the input ports this NiFi has access to
+ final Set<PortDTO> inputPortDtos = new LinkedHashSet<>();
+ final Set<RootGroupPort> inputPorts = controllerFacade.getInputPorts();
+ for (final RootGroupPort inputPort : inputPorts) {
+ if (isUserAuthorized(user, inputPort)) {
+ final PortDTO dto = new PortDTO();
+ dto.setId(inputPort.getIdentifier());
+ dto.setName(inputPort.getName());
+ dto.setComments(inputPort.getComments());
+ dto.setState(inputPort.getScheduledState().toString());
+ inputPortDtos.add(dto);
+ }
+ }
+
+ // serialize the output ports this NiFi has access to
+ final Set<PortDTO> outputPortDtos = new LinkedHashSet<>();
+ for (final RootGroupPort outputPort : controllerFacade.getOutputPorts()) {
+ if (isUserAuthorized(user, outputPort)) {
+ final PortDTO dto = new PortDTO();
+ dto.setId(outputPort.getIdentifier());
+ dto.setName(outputPort.getName());
+ dto.setComments(outputPort.getComments());
+ dto.setState(outputPort.getScheduledState().toString());
+ outputPortDtos.add(dto);
+ }
+ }
+
+ // get the root group
+ final ProcessGroup rootGroup = processGroupDAO.getProcessGroup(controllerFacade.getRootGroupId());
+ final ProcessGroupCounts counts = rootGroup.getCounts();
+
+ // create the controller dto
+ final ControllerDTO controllerDTO = new ControllerDTO();
+ controllerDTO.setId(controllerFacade.getRootGroupId());
+ controllerDTO.setInstanceId(controllerFacade.getInstanceId());
+ controllerDTO.setName(controllerFacade.getName());
+ controllerDTO.setComments(controllerFacade.getComments());
+ controllerDTO.setInputPorts(inputPortDtos);
+ controllerDTO.setOutputPorts(outputPortDtos);
+ controllerDTO.setInputPortCount(inputPortDtos.size());
+ controllerDTO.setOutputPortCount(outputPortDtos.size());
+ controllerDTO.setRunningCount(counts.getRunningCount());
+ controllerDTO.setStoppedCount(counts.getStoppedCount());
+ controllerDTO.setInvalidCount(counts.getInvalidCount());
+ controllerDTO.setDisabledCount(counts.getDisabledCount());
+
+ // determine the site to site configuration
+ controllerDTO.setRemoteSiteListeningPort(controllerFacade.getRemoteSiteListeningPort());
+ controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getRemoteSiteListeningHttpPort());
+ controllerDTO.setSiteToSiteSecure(controllerFacade.isRemoteSiteCommsSecure());
+
+ return controllerDTO;
+ }
+
+ @Override
+ public ControllerConfigurationEntity getControllerConfiguration() {
+ final Revision rev = revisionManager.getRevision(FlowController.class.getSimpleName());
+ final ControllerConfigurationDTO dto = dtoFactory.createControllerConfigurationDto(controllerFacade);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerFacade);
+ final RevisionDTO revision = dtoFactory.createRevisionDTO(rev);
+ return entityFactory.createControllerConfigurationEntity(dto, revision, permissions);
+ }
+
+ @Override
+ public ControllerBulletinsEntity getControllerBulletins() {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ final ControllerBulletinsEntity controllerBulletinsEntity = new ControllerBulletinsEntity();
+
+ final List<BulletinEntity> controllerBulletinEntities = new ArrayList<>();
+
+ final Authorizable controllerAuthorizable = authorizableLookup.getController();
+ final boolean authorized = controllerAuthorizable.isAuthorized(authorizer, RequestAction.READ, user);
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController());
+ controllerBulletinEntities.addAll(bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, authorized)).collect(Collectors.toList()));
+
+ // get the controller service bulletins
+ final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build();
+ final List<Bulletin> allControllerServiceBulletins = bulletinRepository.findBulletins(controllerServiceQuery);
+ final List<BulletinEntity> controllerServiceBulletinEntities = new ArrayList<>();
+ for (final Bulletin bulletin : allControllerServiceBulletins) {
+ try {
+ final Authorizable controllerServiceAuthorizable = authorizableLookup.getControllerService(bulletin.getSourceId()).getAuthorizable();
+ final boolean controllerServiceAuthorized = controllerServiceAuthorizable.isAuthorized(authorizer, RequestAction.READ, user);
+
+ final BulletinEntity controllerServiceBulletin = entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), controllerServiceAuthorized);
+ controllerServiceBulletinEntities.add(controllerServiceBulletin);
+ controllerBulletinEntities.add(controllerServiceBulletin);
+ } catch (final ResourceNotFoundException e) {
+ // controller service missing.. skip
+ }
+ }
+ controllerBulletinsEntity.setControllerServiceBulletins(controllerServiceBulletinEntities);
+
+ // get the reporting task bulletins
+ final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build();
+ final List<Bulletin> allReportingTaskBulletins = bulletinRepository.findBulletins(reportingTaskQuery);
+ final List<BulletinEntity> reportingTaskBulletinEntities = new ArrayList<>();
+ for (final Bulletin bulletin : allReportingTaskBulletins) {
+ try {
+ final Authorizable reportingTaskAuthorizable = authorizableLookup.getReportingTask(bulletin.getSourceId()).getAuthorizable();
+ final boolean reportingTaskAuthorizableAuthorized = reportingTaskAuthorizable.isAuthorized(authorizer, RequestAction.READ, user);
+
+ final BulletinEntity reportingTaskBulletin = entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), reportingTaskAuthorizableAuthorized);
+ reportingTaskBulletinEntities.add(reportingTaskBulletin);
+ controllerBulletinEntities.add(reportingTaskBulletin);
+ } catch (final ResourceNotFoundException e) {
+ // reporting task missing.. skip
+ }
+ }
+ controllerBulletinsEntity.setReportingTaskBulletins(reportingTaskBulletinEntities);
+
+ controllerBulletinsEntity.setBulletins(pruneAndSortBulletins(controllerBulletinEntities, BulletinRepository.MAX_BULLETINS_FOR_CONTROLLER));
+ return controllerBulletinsEntity;
+ }
+
+ @Override
+ public FlowConfigurationEntity getFlowConfiguration() {
+ final FlowConfigurationDTO dto = dtoFactory.createFlowConfigurationDto(properties.getAutoRefreshInterval(),
+ properties.getDefaultBackPressureObjectThreshold(), properties.getDefaultBackPressureDataSizeThreshold(),properties.getDcaeDistributorApiHostname());
+ final FlowConfigurationEntity entity = new FlowConfigurationEntity();
+ entity.setFlowConfiguration(dto);
+ return entity;
+ }
+
+ @Override
+ public AccessPolicyEntity getAccessPolicy(final String accessPolicyId) {
+ final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(accessPolicyId);
+ return createAccessPolicyEntity(accessPolicy);
+ }
+
+ @Override
+ public AccessPolicyEntity getAccessPolicy(final RequestAction requestAction, final String resource) {
+ Authorizable authorizable;
+ try {
+ authorizable = authorizableLookup.getAuthorizableFromResource(resource);
+ } catch (final ResourceNotFoundException e) {
+ // unable to find the underlying authorizable... user authorized based on top level /policies... create
+ // an anonymous authorizable to attempt to locate an existing policy for this resource
+ authorizable = new Authorizable() {
+ @Override
+ public Authorizable getParentAuthorizable() {
+ return null;
+ }
+
+ @Override
+ public Resource getResource() {
+ return new Resource() {
+ @Override
+ public String getIdentifier() {
+ return resource;
+ }
+
+ @Override
+ public String getName() {
+ return resource;
+ }
+
+ @Override
+ public String getSafeDescription() {
+ return "Policy " + resource;
+ }
+ };
+ }
+ };
+ }
+
+ final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(requestAction, authorizable);
+ return createAccessPolicyEntity(accessPolicy);
+ }
+
+ private AccessPolicyEntity createAccessPolicyEntity(final AccessPolicy accessPolicy) {
+ final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(accessPolicy.getIdentifier()));
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicy.getIdentifier()));
+ final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource());
+ return entityFactory.createAccessPolicyEntity(
+ dtoFactory.createAccessPolicyDto(accessPolicy,
+ accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()),
+ accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()), componentReference),
+ revision, permissions);
+ }
+
+ @Override
+ public UserEntity getUser(final String userId) {
+ final User user = userDAO.getUser(userId);
+ return createUserEntity(user, true);
+ }
+
+ @Override
+ public Set<UserEntity> getUsers() {
+ final Set<User> users = userDAO.getUsers();
+ return users.stream()
+ .map(user -> createUserEntity(user, false))
+ .collect(Collectors.toSet());
+ }
+
+ private UserEntity createUserEntity(final User user, final boolean enforceUserExistence) {
+ final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(user.getIdentifier()));
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
+ final Set<TenantEntity> userGroups = userGroupDAO.getUserGroupsForUser(user.getIdentifier()).stream()
+ .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(enforceUserExistence)).collect(Collectors.toSet());
+ final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUser(user.getIdentifier()).stream()
+ .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
+ return entityFactory.createUserEntity(dtoFactory.createUserDto(user, userGroups, policyEntities), userRevision, permissions);
+ }
+
+ private UserGroupEntity createUserGroupEntity(final Group userGroup, final boolean enforceGroupExistence) {
+ final RevisionDTO userGroupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userGroup.getIdentifier()));
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
+ final Set<TenantEntity> users = userGroup.getUsers().stream().map(mapUserIdToTenantEntity(enforceGroupExistence)).collect(Collectors.toSet());
+ final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(userGroup.getIdentifier()).stream()
+ .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
+ return entityFactory.createUserGroupEntity(dtoFactory.createUserGroupDto(userGroup, users, policyEntities), userGroupRevision, permissions);
+ }
+
+ @Override
+ public UserGroupEntity getUserGroup(final String userGroupId) {
+ final Group userGroup = userGroupDAO.getUserGroup(userGroupId);
+ return createUserGroupEntity(userGroup, true);
+ }
+
+ @Override
+ public Set<UserGroupEntity> getUserGroups() {
+ final Set<Group> userGroups = userGroupDAO.getUserGroups();
+ return userGroups.stream()
+ .map(userGroup -> createUserGroupEntity(userGroup, false))
+ .collect(Collectors.toSet());
+ }
+
+ private LabelEntity createLabelEntity(final Label label) {
+ final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(label.getIdentifier()));
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label);
+ return entityFactory.createLabelEntity(dtoFactory.createLabelDto(label), revision, permissions);
+ }
+
+ @Override
+ public Set<LabelEntity> getLabels(final String groupId) {
+ final Set<Label> labels = labelDAO.getLabels(groupId);
+ return labels.stream()
+ .map(label -> createLabelEntity(label))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public LabelEntity getLabel(final String labelId) {
+ final Label label = labelDAO.getLabel(labelId);
+ return createLabelEntity(label);
+ }
+
+ private FunnelEntity createFunnelEntity(final Funnel funnel) {
+ final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(funnel.getIdentifier()));
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnel);
+ return entityFactory.createFunnelEntity(dtoFactory.createFunnelDto(funnel), revision, permissions);
+ }
+
+ @Override
+ public Set<FunnelEntity> getFunnels(final String groupId) {
+ final Set<Funnel> funnels = funnelDAO.getFunnels(groupId);
+ return funnels.stream()
+ .map(funnel -> createFunnelEntity(funnel))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public FunnelEntity getFunnel(final String funnelId) {
+ final Funnel funnel = funnelDAO.getFunnel(funnelId);
+ return createFunnelEntity(funnel);
+ }
+
+ private PortEntity createInputPortEntity(final Port port) {
+ final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(port.getIdentifier()));
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port, NiFiUserUtils.getNiFiUser());
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port), NiFiUserUtils.getNiFiUser());
+ final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(port.getIdentifier()));
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier()));
+ final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+ return entityFactory.createPortEntity(dtoFactory.createPortDto(port), revision, permissions, operatePermissions, status, bulletinEntities);
+ }
+
+ private PortEntity createOutputPortEntity(final Port port) {
+ final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(port.getIdentifier()));
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port, NiFiUserUtils.getNiFiUser());
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port), NiFiUserUtils.getNiFiUser());
+ final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(port.getIdentifier()));
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier()));
+ final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+ return entityFactory.createPortEntity(dtoFactory.createPortDto(port), revision, permissions, operatePermissions, status, bulletinEntities);
+ }
+
+ @Override
+ public Set<PortEntity> getInputPorts(final String groupId) {
+ final Set<Port> inputPorts = inputPortDAO.getPorts(groupId);
+ return inputPorts.stream()
+ .map(port -> createInputPortEntity(port))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public Set<PortEntity> getOutputPorts(final String groupId) {
+ final Set<Port> ports = outputPortDAO.getPorts(groupId);
+ return ports.stream()
+ .map(port -> createOutputPortEntity(port))
+ .collect(Collectors.toSet());
+ }
+
+ private ProcessGroupEntity createProcessGroupEntity(final ProcessGroup group) {
+ final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(group.getIdentifier()));
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(group);
+ final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(group.getIdentifier()));
+ final List<BulletinEntity> bulletins = getProcessGroupBulletins(group);
+ return entityFactory.createProcessGroupEntity(dtoFactory.createProcessGroupDto(group), revision, permissions, status, bulletins);
+ }
+
+ private List<BulletinEntity> getProcessGroupBulletins(final ProcessGroup group) {
+ final List<Bulletin> bulletins = new ArrayList<>(bulletinRepository.findBulletinsForGroupBySource(group.getIdentifier()));
+
+ for (final ProcessGroup descendantGroup : group.findAllProcessGroups()) {
+ bulletins.addAll(bulletinRepository.findBulletinsForGroupBySource(descendantGroup.getIdentifier()));
+ }
+
+ List<BulletinEntity> bulletinEntities = new ArrayList<>();
+ for (final Bulletin bulletin : bulletins) {
+ bulletinEntities.add(entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), authorizeBulletin(bulletin)));
+ }
+
+ return pruneAndSortBulletins(bulletinEntities, BulletinRepository.MAX_BULLETINS_PER_COMPONENT);
+ }
+
+ private List<BulletinEntity> pruneAndSortBulletins(final List<BulletinEntity> bulletinEntities, final int maxBulletins) {
+ // sort the bulletins
+ Collections.sort(bulletinEntities, new Comparator<BulletinEntity>() {
+ @Override
+ public int compare(BulletinEntity o1, BulletinEntity o2) {
+ if (o1 == null && o2 == null) {
+ return 0;
+ }
+ if (o1 == null) {
+ return 1;
+ }
+ if (o2 == null) {
+ return -1;
+ }
+
+ return -Long.compare(o1.getId(), o2.getId());
+ }
+ });
+
+ // prune the response to only include the max number of bulletins
+ if (bulletinEntities.size() > maxBulletins) {
+ return bulletinEntities.subList(0, maxBulletins);
+ } else {
+ return bulletinEntities;
+ }
+ }
+
+ @Override
+ public Set<ProcessGroupEntity> getProcessGroups(final String parentGroupId) {
+ final Set<ProcessGroup> groups = processGroupDAO.getProcessGroups(parentGroupId);
+ return groups.stream()
+ .map(group -> createProcessGroupEntity(group))
+ .collect(Collectors.toSet());
+ }
+
+ private RemoteProcessGroupEntity createRemoteGroupEntity(final RemoteProcessGroup rpg, final NiFiUser user) {
+ final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(rpg.getIdentifier()));
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(rpg, user);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(rpg), user);
+ final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(rpg, controllerFacade.getRemoteProcessGroupStatus(rpg.getIdentifier()));
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(rpg.getIdentifier()));
+ final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+ return entityFactory.createRemoteProcessGroupEntity(dtoFactory.createRemoteProcessGroupDto(rpg), revision, permissions, operatePermissions, status, bulletinEntities);
+ }
+
+ @Override
+ public Set<RemoteProcessGroupEntity> getRemoteProcessGroups(final String groupId) {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ final Set<RemoteProcessGroup> rpgs = remoteProcessGroupDAO.getRemoteProcessGroups(groupId);
+ return rpgs.stream()
+ .map(rpg -> createRemoteGroupEntity(rpg, user))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public PortEntity getInputPort(final String inputPortId) {
+ final Port port = inputPortDAO.getPort(inputPortId);
+ return createInputPortEntity(port);
+ }
+
+ @Override
+ public PortStatusEntity getInputPortStatus(final String inputPortId) {
+ final Port inputPort = inputPortDAO.getPort(inputPortId);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(inputPort);
+ final PortStatusDTO dto = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(inputPortId));
+ return entityFactory.createPortStatusEntity(dto, permissions);
+ }
+
+ @Override
+ public PortEntity getOutputPort(final String outputPortId) {
+ final Port port = outputPortDAO.getPort(outputPortId);
+ return createOutputPortEntity(port);
+ }
+
+ @Override
+ public PortStatusEntity getOutputPortStatus(final String outputPortId) {
+ final Port outputPort = outputPortDAO.getPort(outputPortId);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(outputPort);
+ final PortStatusDTO dto = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(outputPortId));
+ return entityFactory.createPortStatusEntity(dto, permissions);
+ }
+
+ @Override
+ public RemoteProcessGroupEntity getRemoteProcessGroup(final String remoteProcessGroupId) {
+ final RemoteProcessGroup rpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
+ return createRemoteGroupEntity(rpg, NiFiUserUtils.getNiFiUser());
+ }
+
+ @Override
+ public RemoteProcessGroupStatusEntity getRemoteProcessGroupStatus(final String id) {
+ final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(id);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup);
+ final RemoteProcessGroupStatusDTO dto = dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroup, controllerFacade.getRemoteProcessGroupStatus(id));
+ return entityFactory.createRemoteProcessGroupStatusEntity(dto, permissions);
+ }
+
+ @Override
+ public StatusHistoryEntity getRemoteProcessGroupStatusHistory(final String id) {
+ final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(id);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup);
+ final StatusHistoryDTO dto = controllerFacade.getRemoteProcessGroupStatusHistory(id);
+ return entityFactory.createStatusHistoryEntity(dto, permissions);
+ }
+
+ @Override
+ public CurrentUserEntity getCurrentUser() {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ final CurrentUserEntity entity = new CurrentUserEntity();
+ entity.setIdentity(user.getIdentity());
+ entity.setAnonymous(user.isAnonymous());
+ entity.setProvenancePermissions(dtoFactory.createPermissionsDto(authorizableLookup.getProvenance()));
+ entity.setCountersPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getCounters()));
+ entity.setTenantsPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getTenant()));
+ entity.setControllerPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getController()));
+ entity.setPoliciesPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getPolicies()));
+ entity.setSystemPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getSystem()));
+ entity.setCanVersionFlows(CollectionUtils.isNotEmpty(flowRegistryClient.getRegistryIdentifiers()));
+
+ entity.setRestrictedComponentsPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getRestrictedComponents()));
+
+ final Set<ComponentRestrictionPermissionDTO> componentRestrictionPermissions = new HashSet<>();
+ Arrays.stream(RequiredPermission.values()).forEach(requiredPermission -> {
+ final PermissionsDTO restrictionPermissions = dtoFactory.createPermissionsDto(authorizableLookup.getRestrictedComponents(requiredPermission));
+
+ final RequiredPermissionDTO requiredPermissionDto = new RequiredPermissionDTO();
+ requiredPermissionDto.setId(requiredPermission.getPermissionIdentifier());
+ requiredPermissionDto.setLabel(requiredPermission.getPermissionLabel());
+
+ final ComponentRestrictionPermissionDTO componentRestrictionPermissionDto = new ComponentRestrictionPermissionDTO();
+ componentRestrictionPermissionDto.setRequiredPermission(requiredPermissionDto);
+ componentRestrictionPermissionDto.setPermissions(restrictionPermissions);
+
+ componentRestrictionPermissions.add(componentRestrictionPermissionDto);
+ });
+ entity.setComponentRestrictionPermissions(componentRestrictionPermissions);
+
+ return entity;
+ }
+
+ @Override
+ public ProcessGroupFlowEntity getProcessGroupFlow(final String groupId) {
+ final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
+
+ // Get the Process Group Status but we only need a status depth of one because for any child process group,
+ // we ignore the status of each individual components. I.e., if Process Group A has child Group B, and child Group B
+ // has a Processor, we don't care about the individual stats of that Processor because the ProcessGroupFlowEntity
+ // doesn't include that anyway. So we can avoid including the information in the status that is returned.
+ final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId, 1);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
+ return entityFactory.createProcessGroupFlowEntity(dtoFactory.createProcessGroupFlowDto(processGroup, groupStatus, revisionManager, this::getProcessGroupBulletins), permissions);
+ }
+
+ @Override
+ public ProcessGroupEntity getProcessGroup(final String groupId) {
+ final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
+ return createProcessGroupEntity(processGroup);
+ }
+
+ private ControllerServiceEntity createControllerServiceEntity(final ControllerServiceNode serviceNode, final Set<String> serviceIds) {
+ final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(serviceNode);
+
+ final ControllerServiceReference ref = serviceNode.getReferences();
+ final ControllerServiceReferencingComponentsEntity referencingComponentsEntity = createControllerServiceReferencingComponentsEntity(ref, serviceIds);
+ dto.setReferencingComponents(referencingComponentsEntity.getControllerServiceReferencingComponents());
+
+ final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(serviceNode.getIdentifier()));
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(serviceNode, NiFiUserUtils.getNiFiUser());
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(serviceNode), NiFiUserUtils.getNiFiUser());
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(serviceNode.getIdentifier()));
+ final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+ return entityFactory.createControllerServiceEntity(dto, revision, permissions, operatePermissions, bulletinEntities);
+ }
+
+ @Override
+ public VariableRegistryEntity getVariableRegistry(final String groupId, final boolean includeAncestorGroups) {
+ final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
+ if (processGroup == null) {
+ throw new ResourceNotFoundException("Could not find group with ID " + groupId);
+ }
+
+ return createVariableRegistryEntity(processGroup, includeAncestorGroups);
+ }
+
+ private VariableRegistryEntity createVariableRegistryEntity(final ProcessGroup processGroup, final boolean includeAncestorGroups) {
+ final VariableRegistryDTO registryDto = dtoFactory.createVariableRegistryDto(processGroup, revisionManager);
+ final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processGroup.getIdentifier()));
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
+
+ if (includeAncestorGroups) {
+ ProcessGroup parent = processGroup.getParent();
+ while (parent != null) {
+ final PermissionsDTO parentPerms = dtoFactory.createPermissionsDto(parent);
+ if (Boolean.TRUE.equals(parentPerms.getCanRead())) {
+ final VariableRegistryDTO parentRegistryDto = dtoFactory.createVariableRegistryDto(parent, revisionManager);
+ final Set<VariableEntity> parentVariables = parentRegistryDto.getVariables();
+ registryDto.getVariables().addAll(parentVariables);
+ }
+
+ parent = parent.getParent();
+ }
+ }
+
+ return entityFactory.createVariableRegistryEntity(registryDto, revision, permissions);
+ }
+
+ @Override
+ public VariableRegistryEntity populateAffectedComponents(final VariableRegistryDTO variableRegistryDto) {
+ final String groupId = variableRegistryDto.getProcessGroupId();
+ final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
+ if (processGroup == null) {
+ throw new ResourceNotFoundException("Could not find group with ID " + groupId);
+ }
+
+ final VariableRegistryDTO registryDto = dtoFactory.populateAffectedComponents(variableRegistryDto, processGroup, revisionManager);
+ final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processGroup.getIdentifier()));
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
+ return entityFactory.createVariableRegistryEntity(registryDto, revision, permissions);
+ }
+
+ @Override
+ public Set<ControllerServiceEntity> getControllerServices(final String groupId, final boolean includeAncestorGroups, final boolean includeDescendantGroups) {
+ final Set<ControllerServiceNode> serviceNodes = controllerServiceDAO.getControllerServices(groupId, includeAncestorGroups, includeDescendantGroups);
+ final Set<String> serviceIds = serviceNodes.stream().map(service -> service.getIdentifier()).collect(Collectors.toSet());
+
+ return serviceNodes.stream()
+ .map(serviceNode -> createControllerServiceEntity(serviceNode, serviceIds))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public ControllerServiceEntity getControllerService(final String controllerServiceId) {
+ final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId);
+ return createControllerServiceEntity(controllerService, Sets.newHashSet(controllerServiceId));
+ }
+
+ @Override
+ public PropertyDescriptorDTO getControllerServicePropertyDescriptor(final String id, final String property) {
+ final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(id);
+ PropertyDescriptor descriptor = controllerService.getControllerServiceImplementation().getPropertyDescriptor(property);
+
+ // return an invalid descriptor if the controller service doesn't support this property
+ if (descriptor == null) {
+ descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
+ }
+
+ final String groupId = controllerService.getProcessGroup() == null ? null : controllerService.getProcessGroup().getIdentifier();
+ return dtoFactory.createPropertyDescriptorDto(descriptor, groupId);
+ }
+
+ @Override
+ public ControllerServiceReferencingComponentsEntity getControllerServiceReferencingComponents(final String controllerServiceId) {
+ final ControllerServiceNode service = controllerServiceDAO.getControllerService(controllerServiceId);
+ final ControllerServiceReference ref = service.getReferences();
+ return createControllerServiceReferencingComponentsEntity(ref, Sets.newHashSet(controllerServiceId));
+ }
+
+ private ReportingTaskEntity createReportingTaskEntity(final ReportingTaskNode reportingTask) {
+ final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(reportingTask.getIdentifier()));
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask);
+ final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask));
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier()));
+ final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+ return entityFactory.createReportingTaskEntity(dtoFactory.createReportingTaskDto(reportingTask), revision, permissions, operatePermissions, bulletinEntities);
+ }
+
+ @Override
+ public Set<ReportingTaskEntity> getReportingTasks() {
+ final Set<ReportingTaskNode> reportingTasks = reportingTaskDAO.getReportingTasks();
+ return reportingTasks.stream()
+ .map(reportingTask -> createReportingTaskEntity(reportingTask))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public ReportingTaskEntity getReportingTask(final String reportingTaskId) {
+ final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId);
+ return createReportingTaskEntity(reportingTask);
+ }
+
+ @Override
+ public PropertyDescriptorDTO getReportingTaskPropertyDescriptor(final String id, final String property) {
+ final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(id);
+ PropertyDescriptor descriptor = reportingTask.getReportingTask().getPropertyDescriptor(property);
+
+ // return an invalid descriptor if the reporting task doesn't support this property
+ if (descriptor == null) {
+ descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
+ }
+
+ return dtoFactory.createPropertyDescriptorDto(descriptor, null);
+ }
+
+ @Override
+ public StatusHistoryEntity getProcessGroupStatusHistory(final String groupId) {
+ final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
+ final StatusHistoryDTO dto = controllerFacade.getProcessGroupStatusHistory(groupId);
+ return entityFactory.createStatusHistoryEntity(dto, permissions);
+ }
+
+ @Override
+ public VersionControlComponentMappingEntity registerFlowWithFlowRegistry(final String groupId, final StartVersionControlRequestEntity requestEntity) {
+ final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
+
+ final VersionControlInformation currentVci = processGroup.getVersionControlInformation();
+ final int expectedVersion = currentVci == null ? 1 : currentVci.getVersion() + 1;
+
+ // Create a VersionedProcessGroup snapshot of the flow as it is currently.
+ final InstantiatedVersionedProcessGroup versionedProcessGroup = createFlowSnapshot(groupId);
+
+ final VersionedFlowDTO versionedFlowDto = requestEntity.getVersionedFlow();
+ final String flowId = versionedFlowDto.getFlowId() == null ? UUID.randomUUID().toString() : versionedFlowDto.getFlowId();
+
+ final VersionedFlow versionedFlow = new VersionedFlow();
+ versionedFlow.setBucketIdentifier(versionedFlowDto.getBucketId());
+ versionedFlow.setCreatedTimestamp(System.currentTimeMillis());
+ versionedFlow.setDescription(versionedFlowDto.getDescription());
+ versionedFlow.setModifiedTimestamp(versionedFlow.getCreatedTimestamp());
+ versionedFlow.setName(versionedFlowDto.getFlowName());
+ versionedFlow.setIdentifier(flowId);
+
+ // Add the Versioned Flow and first snapshot to the Flow Registry
+ final String registryId = requestEntity.getVersionedFlow().getRegistryId();
+ final VersionedFlowSnapshot registeredSnapshot;
+ final VersionedFlow registeredFlow;
+
+ String action = "create the flow";
+ try {
+ // first, create the flow in the registry, if necessary
+ if (versionedFlowDto.getFlowId() == null) {
+ registeredFlow = registerVersionedFlow(registryId, versionedFlow);
+ } else {
+ registeredFlow = getVersionedFlow(registryId, versionedFlowDto.getBucketId(), versionedFlowDto.getFlowId());
+ }
+
+ action = "add the local flow to the Flow Registry as the first Snapshot";
+
+ // add first snapshot to the flow in the registry
+ registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, versionedFlowDto.getComments(), expectedVersion);
+ } catch (final NiFiRegistryException e) {
+ throw new IllegalArgumentException(e.getLocalizedMessage());
+ } catch (final IOException ioe) {
+ throw new IllegalStateException("Failed to communicate with Flow Registry when attempting to " + action);
+ }
+
+ final Bucket bucket = registeredSnapshot.getBucket();
+ final VersionedFlow flow = registeredSnapshot.getFlow();
+
+ // Update the Process Group with the new VersionControlInformation. (Send this to all nodes).
+ final VersionControlInformationDTO vci = new VersionControlInformationDTO();
+ vci.setBucketId(bucket.getIdentifier());
+ vci.setBucketName(bucket.getName());
+ vci.setFlowId(flow.getIdentifier());
+ vci.setFlowName(flow.getName());
+ vci.setFlowDescription(flow.getDescription());
+ vci.setGroupId(groupId);
+ vci.setRegistryId(registryId);
+ vci.setRegistryName(getFlowRegistryName(registryId));
+ vci.setVersion(registeredSnapshot.getSnapshotMetadata().getVersion());
+ vci.setState(VersionedFlowState.UP_TO_DATE.name());
+
+ final Map<String, String> mapping = dtoFactory.createVersionControlComponentMappingDto(versionedProcessGroup);
+
+ final Revision groupRevision = revisionManager.getRevision(groupId);
+ final RevisionDTO groupRevisionDto = dtoFactory.createRevisionDTO(groupRevision);
+
+ final VersionControlComponentMappingEntity entity = new VersionControlComponentMappingEntity();
+ entity.setVersionControlInformation(vci);
+ entity.setProcessGroupRevision(groupRevisionDto);
+ entity.setVersionControlComponentMapping(mapping);
+ return entity;
+ }
+
+ @Override
+ public VersionedFlow deleteVersionedFlow(final String registryId, final String bucketId, final String flowId) {
+ final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
+ if (registry == null) {
+ throw new IllegalArgumentException("No Flow Registry exists with ID " + registryId);
+ }
+
+ try {
+ return registry.deleteVersionedFlow(bucketId, flowId, NiFiUserUtils.getNiFiUser());
+ } catch (final IOException | NiFiRegistryException e) {
+ throw new NiFiCoreException("Failed to remove flow from Flow Registry due to " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public VersionControlInformationEntity getVersionControlInformation(final String groupId) {
+ final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
+ final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();
+ if (versionControlInfo == null) {
+ return null;
+ }
+
+ final VersionControlInformationDTO versionControlDto = dtoFactory.createVersionControlInformationDto(processGroup);
+ final RevisionDTO groupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(groupId));
+ return entityFactory.createVersionControlInformationEntity(versionControlDto, groupRevision);
+ }
+
+ private InstantiatedVersionedProcessGroup createFlowSnapshot(final String processGroupId) {
+ final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
+ final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
+ final InstantiatedVersionedProcessGroup versionedGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, false);
+ return versionedGroup;
+ }
+
+ @Override
+ public FlowComparisonEntity getLocalModifications(final String processGroupId) {
+ final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
+ final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();
+ if (versionControlInfo == null) {
+ throw new IllegalStateException("Process Group with ID " + processGroupId + " is not under Version Control");
+ }
+
+ final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryIdentifier());
+ if (flowRegistry == null) {
+ throw new IllegalStateException("Process Group with ID " + processGroupId + " is tracking to a flow in Flow Registry with ID " + versionControlInfo.getRegistryIdentifier()
+ + " but cannot find a Flow Registry with that identifier");
+ }
+
+ final VersionedFlowSnapshot versionedFlowSnapshot;
+ try {
+ versionedFlowSnapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketIdentifier(),
+ versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), true, NiFiUserUtils.getNiFiUser());
+ } catch (final IOException | NiFiRegistryException e) {
+ throw new NiFiCoreException("Failed to retrieve flow with Flow Registry in order to calculate local differences due to " + e.getMessage(), e);
+ }
+
+ final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
+ final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, true);
+ final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents();
+
+ final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup);
+ final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", registryGroup);
+
+ final Set<String> ancestorServiceIds = getAncestorGroupServiceIds(processGroup);
+ final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, new ConciseEvolvingDifferenceDescriptor());
+ final FlowComparison flowComparison = flowComparator.compare();
+
+ final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison);
+
+ final FlowComparisonEntity entity = new FlowComparisonEntity();
+ entity.setComponentDifferences(differenceDtos);
+ return entity;
+ }
+
+ private Set<String> getAncestorGroupServiceIds(final ProcessGroup group) {
+ final Set<String> ancestorServiceIds;
+ ProcessGroup parentGroup = group.getParent();
+
+ if (parentGroup == null) {
+ ancestorServiceIds = Collections.emptySet();
+ } else {
+ ancestorServiceIds = parentGroup.getControllerServices(true).stream()
+ .map(cs -> {
+ // We want to map the Controller Service to its Versioned Component ID, if it has one.
+ // If it does not have one, we want to generate it in the same way that our Flow Mapper does
+ // because this allows us to find the Controller Service when doing a Flow Diff.
+ final Optional<String> versionedId = cs.getVersionedComponentId();
+ if (versionedId.isPresent()) {
+ return versionedId.get();
+ }
+
+ return UUID.nameUUIDFromBytes(cs.getIdentifier().getBytes(StandardCharsets.UTF_8)).toString();
+ })
+ .collect(Collectors.toSet());
+ }
+
+ return ancestorServiceIds;
+ }
+
+ @Override
+ public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) {
+ final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
+ if (registry == null) {
+ throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
+ }
+
+ try {
+ return registry.registerVersionedFlow(flow, NiFiUserUtils.getNiFiUser());
+ } catch (final IOException | NiFiRegistryException e) {
+ throw new NiFiCoreException("Failed to register flow with Flow Registry due to " + e.getMessage(), e);
+ }
+ }
+
+ private VersionedFlow getVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
+ final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
+ if (registry == null) {
+ throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
+ }
+
+ return registry.getVersionedFlow(bucketId, flowId, NiFiUserUtils.getNiFiUser());
+ }
+
+ @Override
+ public VersionedFlowSnapshot registerVersionedFlowSnapshot(final String registryId, final VersionedFlow flow,
+ final VersionedProcessGroup snapshot, final String comments, final int expectedVersion) {
+ final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
+ if (registry == null) {
+ throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
+ }
+
+ try {
+ return registry.registerVersionedFlowSnapshot(flow, snapshot, comments, expectedVersion, NiFiUserUtils.getNiFiUser());
+ } catch (final IOException | NiFiRegistryException e) {
+ throw new NiFiCoreException("Failed to register flow with Flow Registry due to " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public VersionControlInformationEntity setVersionControlInformation(final Revision revision, final String processGroupId,
+ final VersionControlInformationDTO versionControlInfo, final Map<String, String> versionedComponentMapping) {
+
+ final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
+
+ final RevisionUpdate<VersionControlInformationDTO> snapshot = updateComponent(revision,
+ group,
+ () -> processGroupDAO.updateVersionControlInformation(versionControlInfo, versionedComponentMapping),
+ processGroup -> dtoFactory.createVersionControlInformationDto(processGroup));
+
+ return entityFactory.createVersionControlInformationEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()));
+ }
+
+ @Override
+ public VersionControlInformationEntity deleteVersionControl(final Revision revision, final String processGroupId) {
+ final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
+
+ final RevisionUpdate<VersionControlInformationDTO> snapshot = updateComponent(revision,
+ group,
+ () -> processGroupDAO.disconnectVersionControl(processGroupId),
+ processGroup -> dtoFactory.createVersionControlInformationDto(group));
+
+ return entityFactory.createVersionControlInformationEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()));
+ }
+
+ @Override
+ public void verifyCanUpdate(final String groupId, final VersionedFlowSnapshot proposedFlow, final boolean verifyConnectionRemoval, final boolean verifyNotDirty) {
+ final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
+ group.verifyCanUpdate(proposedFlow, verifyConnectionRemoval, verifyNotDirty);
+ }
+
+ @Override
+ public void verifyCanSaveToFlowRegistry(final String groupId, final String registryId, final String bucketId, final String flowId) {
+ final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
+ group.verifyCanSaveToFlowRegistry(registryId, bucketId, flowId);
+ }
+
+ @Override
+ public void verifyCanRevertLocalModifications(final String groupId, final VersionedFlowSnapshot versionedFlowSnapshot) {
+ final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
+ group.verifyCanRevertLocalModifications();
+
+ // verify that the process group can be updated to the given snapshot. We do not verify that connections can
+ // be removed, because the flow may still be running, and it only matters that the connections can be removed once the components
+ // have been stopped.
+ group.verifyCanUpdate(versionedFlowSnapshot, false, false);
+ }
+
+ @Override
+ public Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(final String processGroupId, final VersionedFlowSnapshot updatedSnapshot) {
+ final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
+
+ final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
+ final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, controllerFacade.getControllerServiceProvider(), flowRegistryClient, true);
+
+ final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localContents);
+ final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Versioned Flow", updatedSnapshot.getFlowContents());
+
+ final Set<String> ancestorGroupServiceIds = getAncestorGroupServiceIds(group);
+ final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, ancestorGroupServiceIds, new StaticDifferenceDescriptor());
+ final FlowComparison comparison = flowComparator.compare();
+
+ final Set<AffectedComponentEntity> affectedComponents = comparison.getDifferences().stream()
+ .filter(difference -> difference.getDifferenceType() != DifferenceType.COMPONENT_ADDED) // components that are added are not components that will be affected in the local flow.
+ .filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED)
+ .filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
+ .filter(FlowDifferenceFilters.FILTER_IGNORABLE_VERSIONED_FLOW_COORDINATE_CHANGES)
+ .map(difference -> {
+ final VersionedComponent localComponent = difference.getComponentA();
+
+ final String state;
+ switch (localComponent.getComponentType()) {
+ case CONTROLLER_SERVICE:
+ final String serviceId = ((InstantiatedVersionedControllerService) localComponent).getInstanceId();
+ state = controllerServiceDAO.getControllerService(serviceId).getState().name();
+ break;
+ case PROCESSOR:
+ final String processorId = ((InstantiatedVersionedProcessor) localComponent).getInstanceId();
+ state = processorDAO.getProcessor(processorId).getPhysicalScheduledState().name();
+ break;
+ case REMOTE_INPUT_PORT:
+ final InstantiatedVersionedRemoteGroupPort inputPort = (InstantiatedVersionedRemoteGroupPort) localComponent;
+ state = remoteProcessGroupDAO.getRemoteProcessGroup(inputPort.getInstanceGroupId()).getInputPort(inputPort.getInstanceId()).getScheduledState().name();
+ break;
+ case REMOTE_OUTPUT_PORT:
+ final InstantiatedVersionedRemoteGroupPort outputPort = (InstantiatedVersionedRemoteGroupPort) localComponent;
+ state = remoteProcessGroupDAO.getRemoteProcessGroup(outputPort.getInstanceGroupId()).getOutputPort(outputPort.getInstanceId()).getScheduledState().name();
+ break;
+ default:
+ state = null;
+ break;
+ }
+
+ return createAffectedComponentEntity((InstantiatedVersionedComponent) localComponent, localComponent.getComponentType().name(), state);
+ })
+ .collect(Collectors.toCollection(HashSet::new));
+
+ for (final FlowDifference difference : comparison.getDifferences()) {
+ // Ignore these as local differences for now because we can't do anything with it
+ if (difference.getDifferenceType() == DifferenceType.BUNDLE_CHANGED) {
+ continue;
+ }
+
+ // Ignore differences for adding remote ports
+ if (FlowDifferenceFilters.isAddedOrRemovedRemotePort(difference)) {
+ continue;
+ }
+
+ if (FlowDifferenceFilters.isIgnorableVersionedFlowCoordinateChange(difference)) {
+ continue;
+ }
+
+ final VersionedComponent localComponent = difference.getComponentA();
+ if (localComponent == null) {
+ continue;
+ }
+
+ // If any Process Group is removed, consider all components below that Process Group as an affected component
+ if (difference.getDifferenceType() == DifferenceType.COMPONENT_REMOVED && localComponent.getComponentType() == org.apache.nifi.registry.flow.ComponentType.PROCESS_GROUP) {
+ final String localGroupId = ((InstantiatedVersionedProcessGroup) localComponent).getInstanceId();
+ final ProcessGroup localGroup = processGroupDAO.getProcessGroup(localGroupId);
+
+ localGroup.findAllProcessors().stream()
+ .map(comp -> createAffectedComponentEntity(comp))
+ .forEach(affectedComponents::add);
+ localGroup.findAllFunnels().stream()
+ .map(comp -> createAffectedComponentEntity(comp))
+ .forEach(affectedComponents::add);
+ localGroup.findAllInputPorts().stream()
+ .map(comp -> createAffectedComponentEntity(comp))
+ .forEach(affectedComponents::add);
+ localGroup.findAllOutputPorts().stream()
+ .map(comp -> createAffectedComponentEntity(comp))
+ .forEach(affectedComponents::add);
+ localGroup.findAllRemoteProcessGroups().stream()
+ .flatMap(rpg -> Stream.concat(rpg.getInputPorts().stream(), rpg.getOutputPorts().stream()))
+ .map(comp -> createAffectedComponentEntity(comp))
+ .forEach(affectedComponents::add);
+ localGroup.findAllControllerServices().stream()
+ .map(comp -> createAffectedComponentEntity(comp))
+ .forEach(affectedComponents::add);
+ }
+
+ if (localComponent.getComponentType() == org.apache.nifi.registry.flow.ComponentType.CONTROLLER_SERVICE) {
+ final String serviceId = ((InstantiatedVersionedControllerService) localComponent).getInstanceId();
+ final ControllerServiceNode serviceNode = controllerServiceDAO.getControllerService(serviceId);
+
+ final List<ControllerServiceNode> referencingServices = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
+ for (final ControllerServiceNode referencingService : referencingServices) {
+ affectedComponents.add(createAffectedComponentEntity(referencingService));
+ }
+
+ final List<ProcessorNode> referencingProcessors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class);
+ for (final ProcessorNode referencingProcessor : referencingProcessors) {
+ affectedComponents.add(createAffectedComponentEntity(referencingProcessor));
+ }
+ }
+ }
+
+ // Create a map of all connectable components by versioned component ID to the connectable component itself
+ final Map<String, List<Connectable>> connectablesByVersionId = new HashMap<>();
+ mapToConnectableId(group.findAllFunnels(), connectablesByVersionId);
+ mapToConnectableId(group.findAllInputPorts(), connectablesByVersionId);
+ mapToConnectableId(group.findAllOutputPorts(), connectablesByVersionId);
+ mapToConnectableId(group.findAllProcessors(), connectablesByVersionId);
+
+ final List<RemoteGroupPort> remotePorts = new ArrayList<>();
+ for (final RemoteProcessGroup rpg : group.findAllRemoteProcessGroups()) {
+ remotePorts.addAll(rpg.getInputPorts());
+ remotePorts.addAll(rpg.getOutputPorts());
+ }
+ mapToConnectableId(remotePorts, connectablesByVersionId);
+
+ // If any connection is added or modified, we need to stop both the source (if it exists in the flow currently)
+ // and the destination (if it exists in the flow currently).
+ for (final FlowDifference difference : comparison.getDifferences()) {
+ VersionedComponent component = difference.getComponentA();
+ if (component == null) {
+ component = difference.getComponentB();
+ }
+
+ if (component.getComponentType() != org.apache.nifi.registry.flow.ComponentType.CONNECTION) {
+ continue;
+ }
+
+ final VersionedConnection connection = (VersionedConnection) component;
+
+ final String sourceVersionedId = connection.getSource().getId();
+ final List<Connectable> sources = connectablesByVersionId.get(sourceVersionedId);
+ if (sources != null) {
+ for (final Connectable source : sources) {
+ affectedComponents.add(createAffectedComponentEntity(source));
+ }
+ }
+
+ final String destinationVersionId = connection.getDestination().getId();
+ final List<Connectable> destinations = connectablesByVersionId.get(destinationVersionId);
+ if (destinations != null) {
+ for (final Connectable destination : destinations) {
+ affectedComponents.add(createAffectedComponentEntity(destination));
+ }
+ }
+ }
+
+ return affectedComponents;
+ }
+
+ private void mapToConnectableId(final Collection<? extends Connectable> connectables, final Map<String, List<Connectable>> destination) {
+ for (final Connectable connectable : connectables) {
+ final Optional<String> versionedIdOption = connectable.getVersionedComponentId();
+
+ // Determine the Versioned ID by using the ID that is assigned, if one is. Otherwise,
+ // we will calculate the Versioned ID. This allows us to map connectables that currently are not under
+ // version control. We have to do this so that if we are changing flow versions and have a component that is running and it does not exist
+ // in the Versioned Flow, we still need to be able to create an AffectedComponentDTO for it.
+ final String versionedId;
+ if (versionedIdOption.isPresent()) {
+ versionedId = versionedIdOption.get();
+ } else {
+ versionedId = UUID.nameUUIDFromBytes(connectable.getIdentifier().getBytes(StandardCharsets.UTF_8)).toString();
+ }
+
+ final List<Connectable> byVersionedId = destination.computeIfAbsent(versionedId, key -> new ArrayList<>());
+ byVersionedId.add(connectable);
+ }
+ }
+
+
+ private AffectedComponentEntity createAffectedComponentEntity(final Connectable connectable) {
+ final AffectedComponentEntity entity = new AffectedComponentEntity();
+ entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(connectable.getIdentifier())));
+ entity.setId(connectable.getIdentifier());
+
+ final Authorizable authorizable = getAuthorizable(connectable);
+ final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable);
+ entity.setPermissions(permissionsDto);
+
+ final AffectedComponentDTO dto = new AffectedComponentDTO();
+ dto.setId(connectable.getIdentifier());
+ dto.setReferenceType(connectable.getConnectableType().name());
+ dto.setState(connectable.getScheduledState().name());
+
+ final String groupId = connectable instanceof RemoteGroupPort ? ((RemoteGroupPort) connectable).getRemoteProcessGroup().getIdentifier() : connectable.getProcessGroupIdentifier();
+ dto.setProcessGroupId(groupId);
+
+ entity.setComponent(dto);
+ return entity;
+ }
+
+ private AffectedComponentEntity createAffectedComponentEntity(final ControllerServiceNode serviceNode) {
+ final AffectedComponentEntity entity = new AffectedComponentEntity();
+ entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(serviceNode.getIdentifier())));
+ entity.setId(serviceNode.getIdentifier());
+
+ final Authorizable authorizable = authorizableLookup.getControllerService(serviceNode.getIdentifier()).getAuthorizable();
+ final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable);
+ entity.setPermissions(permissionsDto);
+
+ final AffectedComponentDTO dto = new AffectedComponentDTO();
+ dto.setId(serviceNode.getIdentifier());
+ dto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
+ dto.setProcessGroupId(serviceNode.getProcessGroupIdentifier());
+ dto.setState(serviceNode.getState().name());
+
+ entity.setComponent(dto);
+ return entity;
+ }
+
+ private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedVersionedComponent instance, final String componentTypeName, final String componentState) {
+ final AffectedComponentEntity entity = new AffectedComponentEntity();
+ entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(instance.getInstanceId())));
+ entity.setId(instance.getInstanceId());
+
+ final Authorizable authorizable = getAuthorizable(componentTypeName, instance);
+ final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable);
+ entity.setPermissions(permissionsDto);
+
+ final AffectedComponentDTO dto = new AffectedComponentDTO();
+ dto.setId(instance.getInstanceId());
+ dto.setReferenceType(componentTypeName);
+ dto.setProcessGroupId(instance.getInstanceGroupId());
+ dto.setState(componentState);
+
+ entity.setComponent(dto);
+ return entity;
+ }
+
+
+ private Authorizable getAuthorizable(final Connectable connectable) {
+ switch (connectable.getConnectableType()) {
+ case REMOTE_INPUT_PORT:
+ case REMOTE_OUTPUT_PORT:
+ final String rpgId = ((RemoteGroupPort) connectable).getRemoteProcessGroup().getIdentifier();
+ return authorizableLookup.getRemoteProcessGroup(rpgId);
+ default:
+ return authorizableLookup.getLocalConnectable(connectable.getIdentifier());
+ }
+ }
+
+ private Authorizable getAuthorizable(final String componentTypeName, final InstantiatedVersionedComponent versionedComponent) {
+ final String componentId = versionedComponent.getInstanceId();
+
+ if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.CONTROLLER_SERVICE.name())) {
+ return authorizableLookup.getControllerService(componentId).getAuthorizable();
+ }
+
+ if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.CONNECTION.name())) {
+ return authorizableLookup.getConnection(componentId).getAuthorizable();
+ }
+
+ if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.FUNNEL.name())) {
+ return authorizableLookup.getFunnel(componentId);
+ }
+
+ if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.INPUT_PORT.name())) {
+ return authorizableLookup.getInputPort(componentId);
+ }
+
+ if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.OUTPUT_PORT.name())) {
+ return authorizableLookup.getOutputPort(componentId);
+ }
+
+ if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.LABEL.name())) {
+ return authorizableLookup.getLabel(componentId);
+ }
+
+ if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.PROCESS_GROUP.name())) {
+ return authorizableLookup.getProcessGroup(componentId).getAuthorizable();
+ }
+
+ if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.PROCESSOR.name())) {
+ return authorizableLookup.getProcessor(componentId).getAuthorizable();
+ }
+
+ if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_INPUT_PORT.name())) {
+ return authorizableLookup.getRemoteProcessGroup(versionedComponent.getInstanceGroupId());
+ }
+
+ if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_OUTPUT_PORT.name())) {
+ return authorizableLookup.getRemoteProcessGroup(versionedComponent.getInstanceGroupId());
+ }
+
+ if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_PROCESS_GROUP.name())) {
+ return authorizableLookup.getRemoteProcessGroup(componentId);
+ }
+
+ return null;
+ }
+
+ @Override
+ public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo, final boolean fetchRemoteFlows) {
+ final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryId());
+ if (flowRegistry == null) {
+ throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + versionControlInfo.getRegistryId());
+ }
+
+ final VersionedFlowSnapshot snapshot;
+ try {
+ snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion(), fetchRemoteFlows, NiFiUserUtils.getNiFiUser());
+ } catch (final NiFiRegistryException | IOException e) {
+ logger.error(e.getMessage(), e);
+ throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket "
+ + versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion());
+ }
+
+ return snapshot;
+ }
+
+ @Override
+ public String getFlowRegistryName(final String flowRegistryId) {
+ final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(flowRegistryId);
+ return flowRegistry == null ? flowRegistryId : flowRegistry.getName();
+ }
+
+ private List<Revision> getComponentRevisions(final ProcessGroup processGroup, final boolean includeGroupRevision) {
+ final List<Revision> revisions = new ArrayList<>();
+ if (includeGroupRevision) {
+ revisions.add(revisionManager.getRevision(processGroup.getIdentifier()));
+ }
+
+ processGroup.findAllConnections().stream()
+ .map(component -> revisionManager.getRevision(component.getIdentifier()))
+ .forEach(revisions::add);
+ processGroup.findAllControllerServices().stream()
+ .map(component -> revisionManager.getRevision(component.getIdentifier()))
+ .forEach(revisions::add);
+ processGroup.findAllFunnels().stream()
+ .map(component -> revisionManager.getRevision(component.getIdentifier()))
+ .forEach(revisions::add);
+ processGroup.findAllInputPorts().stream()
+ .map(component -> revisionManager.getRevision(component.getIdentifier()))
+ .forEach(revisions::add);
+ processGroup.findAllOutputPorts().stream()
+ .map(component -> revisionManager.getRevision(component.getIdentifier()))
+ .forEach(revisions::add);
+ processGroup.findAllLabels().stream()
+ .map(component -> revisionManager.getRevision(component.getIdentifier()))
+ .forEach(revisions::add);
+ processGroup.findAllProcessGroups().stream()
+ .map(component -> revisionManager.getRevision(component.getIdentifier()))
+ .forEach(revisions::add);
+ processGroup.findAllProcessors().stream()
+ .map(component -> revisionManager.getRevision(component.getIdentifier()))
+ .forEach(revisions::add);
+ processGroup.findAllRemoteProcessGroups().stream()
+ .map(component -> revisionManager.getRevision(component.getIdentifier()))
+ .forEach(revisions::add);
+ processGroup.findAllRemoteProcessGroups().stream()
+ .flatMap(rpg -> Stream.concat(rpg.getInputPorts().stream(), rpg.getOutputPorts().stream()))
+ .map(component -> revisionManager.getRevision(component.getIdentifier()))
+ .forEach(revisions::add);
+
+ return revisions;
+ }
+
+ @Override
+ public ProcessGroupEntity updateProcessGroupContents(final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo,
+ final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings, final boolean updateDescendantVersionedFlows) {
+
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
+ final List<Revision> revisions = getComponentRevisions(processGroup, false);
+ revisions.add(revision);
+
+ final RevisionClaim revisionClaim = new StandardRevisionClaim(revisions);
+
+ final RevisionUpdate<ProcessGroupDTO> revisionUpdate = revisionManager.updateRevision(revisionClaim, user, new UpdateRevisionTask<ProcessGroupDTO>() {
+ @Override
+ public RevisionUpdate<ProcessGroupDTO> update() {
+ // update the Process Group
+ processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows);
+
+ // update the revisions
+ final Set<Revision> updatedRevisions = revisions.stream()
+ .map(rev -> revisionManager.getRevision(rev.getComponentId()).incrementRevision(revision.getClientId()))
+ .collect(Collectors.toSet());
+
+ // save
+ controllerFacade.save();
+
+ // gather details for response
+ final ProcessGroupDTO dto = dtoFactory.createProcessGroupDto(processGroup);
+
+ final Revision updatedRevision = revisionManager.getRevision(groupId).incrementRevision(revision.getClientId());
+ final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity());
+ return new StandardRevisionUpdate<>(dto, lastModification, updatedRevisions);
+ }
+ });
+
+ final FlowModification lastModification = revisionUpdate.getLastModification();
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
+ final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(lastModification);
+ final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroup.getIdentifier()));
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroup.getIdentifier()));
+ final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+ return entityFactory.createProcessGroupEntity(revisionUpdate.getComponent(), updatedRevision, permissions, status, bulletinEntities);
+ }
+
+ private AuthorizationResult authorizeAction(final Action action) {
+ final String sourceId = action.getSourceId();
+ final Component type = action.getSourceType();
+
+ Authorizable authorizable;
+ try {
+ switch (type) {
+ case Processor:
+ authorizable = authorizableLookup.getProcessor(sourceId).getAuthorizable();
+ break;
+ case ReportingTask:
+ authorizable = authorizableLookup.getReportingTask(sourceId).getAuthorizable();
+ break;
+ case ControllerService:
+ authorizable = authorizableLookup.getControllerService(sourceId).getAuthorizable();
+ break;
+ case Controller:
+ authorizable = controllerFacade;
+ break;
+ case InputPort:
+ authorizable = authorizableLookup.getInputPort(sourceId);
+ break;
+ case OutputPort:
+ authorizable = authorizableLookup.getOutputPort(sourceId);
+ break;
+ case ProcessGroup:
+ authorizable = authorizableLookup.getProcessGroup(sourceId).getAuthorizable();
+ break;
+ case RemoteProcessGroup:
+ authorizable = authorizableLookup.getRemoteProcessGroup(sourceId);
+ break;
+ case Funnel:
+ authorizable = authorizableLookup.getFunnel(sourceId);
+ break;
+ case Connection:
+ authorizable = authorizableLookup.getConnection(sourceId).getAuthorizable();
+ break;
+ case AccessPolicy:
+ authorizable = authorizableLookup.getAccessPolicyById(sourceId);
+ break;
+ case User:
+ case UserGroup:
+ authorizable = authorizableLookup.getTenant();
+ break;
+ default:
+ throw new WebApplicationException(Response.serverError().entity("An unexpected type of component is the source of this action.").build());
+ }
+ } catch (final ResourceNotFoundException e) {
+ // if the underlying component is gone, use the controller to see if permissions should be allowed
+ authorizable = controllerFacade;
+ }
+
+ // perform the authorization
+ return authorizable.checkAuthorization(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+ }
+
+ @Override
+ public HistoryDTO getActions(final HistoryQueryDTO historyQueryDto) {
+ // extract the query criteria
+ final HistoryQuery historyQuery = new HistoryQuery();
+ historyQuery.setStartDate(historyQueryDto.getStartDate());
+ historyQuery.setEndDate(historyQueryDto.getEndDate());
+ historyQuery.setSourceId(historyQueryDto.getSourceId());
+ historyQuery.setUserIdentity(historyQueryDto.getUserIdentity());
+ historyQuery.setOffset(historyQueryDto.getOffset());
+ historyQuery.setCount(historyQueryDto.getCount());
+ historyQuery.setSortColumn(historyQueryDto.getSortColumn());
+ historyQuery.setSortOrder(historyQueryDto.getSortOrder());
+
+ // perform the query
+ final History history = auditService.getActions(historyQuery);
+
+ // only retain authorized actions
+ final HistoryDTO historyDto = dtoFactory.createHistoryDto(history);
+ if (history.getActions() != null) {
+ final List<ActionEntity> actionEntities = new ArrayList<>();
+ for (final Action action : history.getActions()) {
+ final AuthorizationResult result = authorizeAction(action);
+ actionEntities.add(entityFactory.createActionEntity(dtoFactory.createActionDto(action), Result.Approved.equals(result.getResult())));
+ }
+ historyDto.setActions(actionEntities);
+ }
+
+ // create the response
+ return historyDto;
+ }
+
+ @Override
+ public ActionEntity getAction(final Integer actionId) {
+ // get the action
+ final Action action = auditService.getAction(actionId);
+
+ // ensure the action was found
+ if (action == null) {
+ throw new ResourceNotFoundException(String.format("Unable to find action with id '%s'.", actionId));
+ }
+
+ final AuthorizationResult result = authorizeAction(action);
+ final boolean authorized = Result.Approved.equals(result.getResult());
+ if (!authorized) {
+ throw new AccessDeniedException(result.getExplanation());
+ }
+
+ // return the action
+ return entityFactory.createActionEntity(dtoFactory.createActionDto(action), authorized);
+ }
+
+ @Override
+ public ComponentHistoryDTO getComponentHistory(final String componentId) {
+ final Map<String, PropertyHistoryDTO> propertyHistoryDtos = new LinkedHashMap<>();
+ final Map<String, List<PreviousValue>> propertyHistory = auditService.getPreviousValues(componentId);
+
+ for (final Map.Entry<String, List<PreviousValue>> entry : propertyHistory.entrySet()) {
+ final List<PreviousValueDTO> previousValueDtos = new ArrayList<>();
+
+ for (final PreviousValue previousValue : entry.getValue()) {
+ final PreviousValueDTO dto = new PreviousValueDTO();
+ dto.setPreviousValue(previousValue.getPreviousValue());
+ dto.setTimestamp(previousValue.getTimestamp());
+ dto.setUserIdentity(previousValue.getUserIdentity());
+ previousValueDtos.add(dto);
+ }
+
+ if (!previousValueDtos.isEmpty()) {
+ final PropertyHistoryDTO propertyHistoryDto = new PropertyHistoryDTO();
+ propertyHistoryDto.setPreviousValues(previousValueDtos);
+ propertyHistoryDtos.put(entry.getKey(), propertyHistoryDto);
+ }
+ }
+
+ final ComponentHistoryDTO history = new ComponentHistoryDTO();
+ history.setComponentId(componentId);
+ history.setPropertyHistory(propertyHistoryDtos);
+
+ return history;
+ }
+
+ @Override
+ public ProcessorDiagnosticsEntity getProcessorDiagnostics(final String id) {
+ final ProcessorNode processor = processorDAO.getProcessor(id);
+ final ProcessorStatus processorStatus = controllerFacade.getProcessorStatus(id);
+
+ // Generate Processor Diagnostics
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ final ProcessorDiagnosticsDTO dto = controllerFacade.getProcessorDiagnostics(processor, processorStatus, bulletinRepository, serviceId -> {
+ final ControllerServiceNode serviceNode = controllerServiceDAO.getControllerService(serviceId);
+ return createControllerServiceEntity(serviceNode, Collections.emptySet());
+ });
+
+ // Filter anything out of diagnostics that the user is not authorized to see.
+ final List<JVMDiagnosticsSnapshotDTO> jvmDiagnosticsSnaphots = new ArrayList<>();
+ final JVMDiagnosticsDTO jvmDiagnostics = dto.getJvmDiagnostics();
+ jvmDiagnosticsSnaphots.add(jvmDiagnostics.getAggregateSnapshot());
+
+ // filter controller-related information
+ final boolean canReadController = authorizableLookup.getController().isAuthorized(authorizer, RequestAction.READ, user);
+ if (!canReadController) {
+ for (final JVMDiagnosticsSnapshotDTO snapshot : jvmDiagnosticsSnaphots) {
+ snapshot.setControllerDiagnostics(null);
+ }
+ }
+
+ // filter system diagnostics information
+ final boolean canReadSystem = authorizableLookup.getSystem().isAuthorized(authorizer, RequestAction.READ, user);
+ if (!canReadSystem) {
+ for (final JVMDiagnosticsSnapshotDTO snapshot : jvmDiagnosticsSnaphots) {
+ snapshot.setSystemDiagnosticsDto(null);
+ }
+ }
+
+ final boolean canReadFlow = authorizableLookup.getFlow().isAuthorized(authorizer, RequestAction.READ, user);
+ if (!canReadFlow) {
+ for (final JVMDiagnosticsSnapshotDTO snapshot : jvmDiagnosticsSnaphots) {
+ snapshot.setFlowDiagnosticsDto(null);
+ }
+ }
+
+ // filter connections
+ final Predicate<ConnectionDiagnosticsDTO> connectionAuthorized = connectionDiagnostics -> {
+ final String connectionId = connectionDiagnostics.getConnection().getId();
+ return authorizableLookup.getConnection(connectionId).getAuthorizable().isAuthorized(authorizer, RequestAction.READ, user);
+ };
+
+ // Filter incoming connections by what user is authorized to READ
+ final Set<ConnectionDiagnosticsDTO> incoming = dto.getIncomingConnections();
+ final Set<ConnectionDiagnosticsDTO> filteredIncoming = incoming.stream()
+ .filter(connectionAuthorized)
+ .collect(Collectors.toSet());
+
+ dto.setIncomingConnections(filteredIncoming);
+
+ // Filter outgoing connections by what user is authorized to READ
+ final Set<ConnectionDiagnosticsDTO> outgoing = dto.getOutgoingConnections();
+ final Set<ConnectionDiagnosticsDTO> filteredOutgoing = outgoing.stream()
+ .filter(connectionAuthorized)
+ .collect(Collectors.toSet());
+ dto.setOutgoingConnections(filteredOutgoing);
+
+ // Filter out any controller services that are referenced by the Processor
+ final Set<ControllerServiceDiagnosticsDTO> referencedServices = dto.getReferencedControllerServices();
+ final Set<ControllerServiceDiagnosticsDTO> filteredReferencedServices = referencedServices.stream()
+ .filter(csDiagnostics -> {
+ final String csId = csDiagnostics.getControllerService().getId();
+ return authorizableLookup.getControllerService(csId).getAuthorizable().isAuthorized(authorizer, RequestAction.READ, user);
+ })
+ .map(csDiagnostics -> {
+ // Filter out any referencing components because those are generally not relevant from this context.
+ final ControllerServiceDTO serviceDto = csDiagnostics.getControllerService().getComponent();
+ if (serviceDto != null) {
+ serviceDto.setReferencingComponents(null);
+ }
+ return csDiagnostics;
+ })
+ .collect(Collectors.toSet());
+ dto.setReferencedControllerServices(filteredReferencedServices);
+
+ final Revision revision = revisionManager.getRevision(id);
+ final RevisionDTO revisionDto = dtoFactory.createRevisionDTO(revision);
+ final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(processor);
+ final List<BulletinEntity> bulletins = bulletinRepository.findBulletinsForSource(id).stream()
+ .map(bulletin -> dtoFactory.createBulletinDto(bulletin))
+ .map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissionsDto.getCanRead()))
+ .collect(Collectors.toList());
+
+ final ProcessorStatusDTO processorStatusDto = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier()));
+ return entityFactory.createProcessorDiagnosticsEntity(dto, revisionDto, permissionsDto, processorStatusDto, bulletins);
+ }
+
+ @Override
+ public boolean isClustered() {
+ return controllerFacade.isClustered();
+ }
+
+ @Override
+ public String getNodeId() {
+ final NodeIdentifier nodeId = controllerFacade.getNodeId();
+ if (nodeId != null) {
+ return nodeId.getId();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public ClusterDTO getCluster() {
+ // create cluster summary dto
+ final ClusterDTO clusterDto = new ClusterDTO();
+
+ // set current time
+ clusterDto.setGenerated(new Date());
+
+ // create node dtos
+ final List<NodeDTO> nodeDtos = clusterCoordinator.getNodeIdentifiers().stream()
+ .map(nodeId -> getNode(nodeId))
+ .collect(Collectors.toList());
+ clusterDto.setNodes(nodeDtos);
+
+ return clusterDto;
+ }
+
+ @Override
+ public NodeDTO getNode(final String nodeId) {
+ final NodeIdentifier nodeIdentifier = clusterCoordinator.getNodeIdentifier(nodeId);
+ return getNode(nodeIdentifier);
+ }
+
+ private NodeDTO getNode(final NodeIdentifier nodeId) {
+ final NodeConnectionStatus nodeStatus = clusterCoordinator.getConnectionStatus(nodeId);
+ final List<NodeEvent> events = clusterCoordinator.getNodeEvents(nodeId);
+ final Set<String> roles = getRoles(nodeId);
+ final NodeHeartbeat heartbeat = heartbeatMonitor.getLatestHeartbeat(nodeId);
+ return dtoFactory.createNodeDTO(nodeId, nodeStatus, heartbeat, events, roles);
+ }
+
+ private Set<String> getRoles(final NodeIdentifier nodeId) {
+ final Set<String> roles = new HashSet<>();
+ final String nodeAddress = nodeId.getSocketAddress() + ":" + nodeId.getSocketPort();
+
+ for (final String roleName : ClusterRoles.getAllRoles()) {
+ final String leader = leaderElectionManager.getLeader(roleName);
+ if (leader == null) {
+ continue;
+ }
+
+ if (leader.equals(nodeAddress)) {
+ roles.add(roleName);
+ }
+ }
+
+ return roles;
+ }
+
+ @Override
+ public void deleteNode(final String nodeId) {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ if (user == null) {
+ throw new WebApplicationException(new Throwable("Unable to access details for current user."));
+ }
+
+ final String userDn = user.getIdentity();
+ final NodeIdentifier nodeIdentifier = clusterCoordinator.getNodeIdentifier(nodeId);
+ if (nodeIdentifier == null) {
+ throw new UnknownNodeException("Cannot remove Node with ID " + nodeId + " because it is not part of the cluster");
+ }
+
+ final NodeConnectionStatus nodeConnectionStatus = clusterCoordinator.getConnectionStatus(nodeIdentifier);
+ if (!nodeConnectionStatus.getState().equals(NodeConnectionState.OFFLOADED) && !nodeConnectionStatus.getState().equals(NodeConnectionState.DISCONNECTED)) {
+ throw new IllegalNodeDeletionException("Cannot remove Node with ID " + nodeId +
+ " because it is not disconnected or offloaded, current state = " + nodeConnectionStatus.getState());
+ }
+
+ clusterCoordinator.removeNode(nodeIdentifier, userDn);
+ heartbeatMonitor.removeHeartbeat(nodeIdentifier);
+ }
+
+ /* reusable function declarations for converting ids to tenant entities */
+ private Function<String, TenantEntity> mapUserGroupIdToTenantEntity(final boolean enforceGroupExistence) {
+ return userGroupId -> {
+ final RevisionDTO userGroupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userGroupId));
+
+ final Group group;
+ if (enforceGroupExistence || userGroupDAO.hasUserGroup(userGroupId)) {
+ group = userGroupDAO.getUserGroup(userGroupId);
+ } else {
+ group = new Group.Builder().identifier(userGroupId).name("Group ID - " + userGroupId + " (removed externally)").build();
+ }
+
+ return entityFactory.createTenantEntity(dtoFactory.createTenantDTO(group), userGroupRevision,
+ dtoFactory.createPermissionsDto(authorizableLookup.getTenant()));
+ };
+ }
+
+ private Function<String, TenantEntity> mapUserIdToTenantEntity(final boolean enforceUserExistence) {
+ return userId -> {
+ final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userId));
+
+ final User user;
+ if (enforceUserExistence || userDAO.hasUser(userId)) {
+ user = userDAO.getUser(userId);
+ } else {
+ user = new User.Builder().identifier(userId).identity("User ID - " + userId + " (removed externally)").build();
+ }
+
+ return entityFactory.createTenantEntity(dtoFactory.createTenantDTO(user), userRevision,
+ dtoFactory.createPermissionsDto(authorizableLookup.getTenant()));
+ };
+ }
+
+
+ /* setters */
+ public void setProperties(final NiFiProperties properties) {
+ this.properties = properties;
+ }
+
+ public void setControllerFacade(final ControllerFacade controllerFacade) {
+ this.controllerFacade = controllerFacade;
+ }
+
+ public void setRemoteProcessGroupDAO(final RemoteProcessGroupDAO remoteProcessGroupDAO) {
+ this.remoteProcessGroupDAO = remoteProcessGroupDAO;
+ }
+
+ public void setLabelDAO(final LabelDAO labelDAO) {
+ this.labelDAO = labelDAO;
+ }
+
+ public void setFunnelDAO(final FunnelDAO funnelDAO) {
+ this.funnelDAO = funnelDAO;
+ }
+
+ public void setSnippetDAO(final SnippetDAO snippetDAO) {
+ this.snippetDAO = snippetDAO;
+ }
+
+ public void setProcessorDAO(final ProcessorDAO processorDAO) {
+ this.processorDAO = processorDAO;
+ }
+
+ public void setConnectionDAO(final ConnectionDAO connectionDAO) {
+ this.connectionDAO = connectionDAO;
+ }
+
+ public void setAuditService(final AuditService auditService) {
+ this.auditService = auditService;
+ }
+
+ public void setRevisionManager(final RevisionManager revisionManager) {
+ this.revisionManager = revisionManager;
+ }
+
+ public void setDtoFactory(final DtoFactory dtoFactory) {
+ this.dtoFactory = dtoFactory;
+ }
+
+ public void setEntityFactory(final EntityFactory entityFactory) {
+ this.entityFactory = entityFactory;
+ }
+
+ public void setInputPortDAO(final PortDAO inputPortDAO) {
+ this.inputPortDAO = inputPortDAO;
+ }
+
+ public void setOutputPortDAO(final PortDAO outputPortDAO) {
+ this.outputPortDAO = outputPortDAO;
+ }
+
+ public void setProcessGroupDAO(final ProcessGroupDAO processGroupDAO) {
+ this.processGroupDAO = processGroupDAO;
+ }
+
+ public void setControllerServiceDAO(final ControllerServiceDAO controllerServiceDAO) {
+ this.controllerServiceDAO = controllerServiceDAO;
+ }
+
+ public void setReportingTaskDAO(final ReportingTaskDAO reportingTaskDAO) {
+ this.reportingTaskDAO = reportingTaskDAO;
+ }
+
+ public void setTemplateDAO(final TemplateDAO templateDAO) {
+ this.templateDAO = templateDAO;
+ }
+
+ public void setSnippetUtils(final SnippetUtils snippetUtils) {
+ this.snippetUtils = snippetUtils;
+ }
+
+ public void setAuthorizableLookup(final AuthorizableLookup authorizableLookup) {
+ this.authorizableLookup = authorizableLookup;
+ }
+
+ public void setAuthorizer(final Authorizer authorizer) {
+ this.authorizer = authorizer;
+ }
+
+ public void setUserDAO(final UserDAO userDAO) {
+ this.userDAO = userDAO;
+ }
+
+ public void setUserGroupDAO(final UserGroupDAO userGroupDAO) {
+ this.userGroupDAO = userGroupDAO;
+ }
+
+ public void setAccessPolicyDAO(final AccessPolicyDAO accessPolicyDAO) {
+ this.accessPolicyDAO = accessPolicyDAO;
+ }
+
+ public void setClusterCoordinator(final ClusterCoordinator coordinator) {
+ this.clusterCoordinator = coordinator;
+ }
+
+ public void setHeartbeatMonitor(final HeartbeatMonitor heartbeatMonitor) {
+ this.heartbeatMonitor = heartbeatMonitor;
+ }
+
+ public void setBulletinRepository(final BulletinRepository bulletinRepository) {
+ this.bulletinRepository = bulletinRepository;
+ }
+
+ public void setLeaderElectionManager(final LeaderElectionManager leaderElectionManager) {
+ this.leaderElectionManager = leaderElectionManager;
+ }
+
+ public void setRegistryDAO(RegistryDAO registryDao) {
+ this.registryDAO = registryDao;
+ }
+
+ public void setFlowRegistryClient(FlowRegistryClient flowRegistryClient) {
+ this.flowRegistryClient = flowRegistryClient;
+ }
+}