summaryrefslogtreecommitdiffstats
path: root/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
diff options
context:
space:
mode:
Diffstat (limited to 'mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java')
-rw-r--r--mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java4899
1 files changed, 0 insertions, 4899 deletions
diff --git a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
deleted file mode 100644
index 8ad05bd..0000000
--- a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ /dev/null
@@ -1,4899 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * Modifications to the original nifi code for the ONAP project are made
- * available under the Apache License, Version 2.0
- */
-package org.apache.nifi.web;
-
-import com.google.common.collect.Sets;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.nifi.action.Action;
-import org.apache.nifi.action.Component;
-import org.apache.nifi.action.FlowChangeAction;
-import org.apache.nifi.action.Operation;
-import org.apache.nifi.action.details.FlowChangePurgeDetails;
-import org.apache.nifi.admin.service.AuditService;
-import org.apache.nifi.authorization.AccessDeniedException;
-import org.apache.nifi.authorization.AccessPolicy;
-import org.apache.nifi.authorization.AuthorizableLookup;
-import org.apache.nifi.authorization.AuthorizationRequest;
-import org.apache.nifi.authorization.AuthorizationResult;
-import org.apache.nifi.authorization.AuthorizationResult.Result;
-import org.apache.nifi.authorization.AuthorizeAccess;
-import org.apache.nifi.authorization.Authorizer;
-import org.apache.nifi.authorization.Group;
-import org.apache.nifi.authorization.RequestAction;
-import org.apache.nifi.authorization.Resource;
-import org.apache.nifi.authorization.User;
-import org.apache.nifi.authorization.UserContextKeys;
-import org.apache.nifi.authorization.resource.Authorizable;
-import org.apache.nifi.authorization.resource.EnforcePolicyPermissionsThroughBaseResource;
-import org.apache.nifi.authorization.resource.OperationAuthorizable;
-import org.apache.nifi.authorization.resource.ResourceFactory;
-import org.apache.nifi.authorization.user.NiFiUser;
-import org.apache.nifi.authorization.user.NiFiUserUtils;
-import org.apache.nifi.bundle.BundleCoordinate;
-import org.apache.nifi.cluster.coordination.ClusterCoordinator;
-import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
-import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
-import org.apache.nifi.cluster.coordination.node.ClusterRoles;
-import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
-import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
-import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
-import org.apache.nifi.cluster.coordination.node.OffloadCode;
-import org.apache.nifi.cluster.event.NodeEvent;
-import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException;
-import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.components.ConfigurableComponent;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.RequiredPermission;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.components.state.Scope;
-import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.connectable.Connectable;
-import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.connectable.Funnel;
-import org.apache.nifi.connectable.Port;
-import org.apache.nifi.controller.ComponentNode;
-import org.apache.nifi.controller.Counter;
-import org.apache.nifi.controller.FlowController;
-import org.apache.nifi.controller.ProcessorNode;
-import org.apache.nifi.controller.ReportingTaskNode;
-import org.apache.nifi.controller.ScheduledState;
-import org.apache.nifi.controller.Snippet;
-import org.apache.nifi.controller.Template;
-import org.apache.nifi.controller.label.Label;
-import org.apache.nifi.controller.leader.election.LeaderElectionManager;
-import org.apache.nifi.controller.repository.claim.ContentDirection;
-import org.apache.nifi.controller.service.ControllerServiceNode;
-import org.apache.nifi.controller.service.ControllerServiceReference;
-import org.apache.nifi.controller.service.ControllerServiceState;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.controller.status.ProcessorStatus;
-import org.apache.nifi.diagnostics.SystemDiagnostics;
-import org.apache.nifi.events.BulletinFactory;
-import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.groups.ProcessGroupCounts;
-import org.apache.nifi.groups.RemoteProcessGroup;
-import org.apache.nifi.history.History;
-import org.apache.nifi.history.HistoryQuery;
-import org.apache.nifi.history.PreviousValue;
-import org.apache.nifi.registry.ComponentVariableRegistry;
-import org.apache.nifi.registry.authorization.Permissions;
-import org.apache.nifi.registry.bucket.Bucket;
-import org.apache.nifi.registry.client.NiFiRegistryException;
-import org.apache.nifi.registry.flow.FlowRegistry;
-import org.apache.nifi.registry.flow.FlowRegistryClient;
-import org.apache.nifi.registry.flow.VersionControlInformation;
-import org.apache.nifi.registry.flow.VersionedComponent;
-import org.apache.nifi.registry.flow.VersionedConnection;
-import org.apache.nifi.registry.flow.VersionedFlow;
-import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
-import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
-import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
-import org.apache.nifi.registry.flow.VersionedFlowState;
-import org.apache.nifi.registry.flow.VersionedProcessGroup;
-import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
-import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor;
-import org.apache.nifi.registry.flow.diff.DifferenceType;
-import org.apache.nifi.registry.flow.diff.FlowComparator;
-import org.apache.nifi.registry.flow.diff.FlowComparison;
-import org.apache.nifi.registry.flow.diff.FlowDifference;
-import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
-import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
-import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor;
-import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
-import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
-import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
-import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
-import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPort;
-import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
-import org.apache.nifi.remote.RemoteGroupPort;
-import org.apache.nifi.remote.RootGroupPort;
-import org.apache.nifi.reporting.Bulletin;
-import org.apache.nifi.reporting.BulletinQuery;
-import org.apache.nifi.reporting.BulletinRepository;
-import org.apache.nifi.reporting.ComponentType;
-import org.apache.nifi.util.BundleUtils;
-import org.apache.nifi.util.FlowDifferenceFilters;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.web.api.dto.AccessPolicyDTO;
-import org.apache.nifi.web.api.dto.AccessPolicySummaryDTO;
-import org.apache.nifi.web.api.dto.AffectedComponentDTO;
-import org.apache.nifi.web.api.dto.BucketDTO;
-import org.apache.nifi.web.api.dto.BulletinBoardDTO;
-import org.apache.nifi.web.api.dto.BulletinDTO;
-import org.apache.nifi.web.api.dto.BulletinQueryDTO;
-import org.apache.nifi.web.api.dto.BundleDTO;
-import org.apache.nifi.web.api.dto.ClusterDTO;
-import org.apache.nifi.web.api.dto.ComponentDTO;
-import org.apache.nifi.web.api.dto.ComponentDifferenceDTO;
-import org.apache.nifi.web.api.dto.ComponentHistoryDTO;
-import org.apache.nifi.web.api.dto.ComponentReferenceDTO;
-import org.apache.nifi.web.api.dto.ComponentRestrictionPermissionDTO;
-import org.apache.nifi.web.api.dto.ComponentStateDTO;
-import org.apache.nifi.web.api.dto.ConnectionDTO;
-import org.apache.nifi.web.api.dto.ControllerConfigurationDTO;
-import org.apache.nifi.web.api.dto.ControllerDTO;
-import org.apache.nifi.web.api.dto.ControllerServiceDTO;
-import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
-import org.apache.nifi.web.api.dto.CounterDTO;
-import org.apache.nifi.web.api.dto.CountersDTO;
-import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
-import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
-import org.apache.nifi.web.api.dto.DropRequestDTO;
-import org.apache.nifi.web.api.dto.DtoFactory;
-import org.apache.nifi.web.api.dto.EntityFactory;
-import org.apache.nifi.web.api.dto.FlowConfigurationDTO;
-import org.apache.nifi.web.api.dto.FlowFileDTO;
-import org.apache.nifi.web.api.dto.FlowSnippetDTO;
-import org.apache.nifi.web.api.dto.FunnelDTO;
-import org.apache.nifi.web.api.dto.LabelDTO;
-import org.apache.nifi.web.api.dto.ListingRequestDTO;
-import org.apache.nifi.web.api.dto.NodeDTO;
-import org.apache.nifi.web.api.dto.PermissionsDTO;
-import org.apache.nifi.web.api.dto.PortDTO;
-import org.apache.nifi.web.api.dto.PreviousValueDTO;
-import org.apache.nifi.web.api.dto.ProcessGroupDTO;
-import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
-import org.apache.nifi.web.api.dto.ProcessorDTO;
-import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
-import org.apache.nifi.web.api.dto.PropertyHistoryDTO;
-import org.apache.nifi.web.api.dto.RegistryDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
-import org.apache.nifi.web.api.dto.ReportingTaskDTO;
-import org.apache.nifi.web.api.dto.RequiredPermissionDTO;
-import org.apache.nifi.web.api.dto.ResourceDTO;
-import org.apache.nifi.web.api.dto.RevisionDTO;
-import org.apache.nifi.web.api.dto.SnippetDTO;
-import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
-import org.apache.nifi.web.api.dto.TemplateDTO;
-import org.apache.nifi.web.api.dto.UserDTO;
-import org.apache.nifi.web.api.dto.UserGroupDTO;
-import org.apache.nifi.web.api.dto.VariableRegistryDTO;
-import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
-import org.apache.nifi.web.api.dto.VersionedFlowDTO;
-import org.apache.nifi.web.api.dto.action.HistoryDTO;
-import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
-import org.apache.nifi.web.api.dto.diagnostics.ConnectionDiagnosticsDTO;
-import org.apache.nifi.web.api.dto.diagnostics.ControllerServiceDiagnosticsDTO;
-import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsDTO;
-import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsSnapshotDTO;
-import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO;
-import org.apache.nifi.web.api.dto.flow.FlowDTO;
-import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
-import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
-import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO;
-import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO;
-import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
-import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
-import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
-import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.PortStatusDTO;
-import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
-import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
-import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
-import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
-import org.apache.nifi.web.api.entity.AccessPolicyEntity;
-import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity;
-import org.apache.nifi.web.api.entity.ActionEntity;
-import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
-import org.apache.nifi.web.api.entity.AffectedComponentEntity;
-import org.apache.nifi.web.api.entity.BucketEntity;
-import org.apache.nifi.web.api.entity.BulletinEntity;
-import org.apache.nifi.web.api.entity.ComponentReferenceEntity;
-import org.apache.nifi.web.api.entity.ConnectionEntity;
-import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
-import org.apache.nifi.web.api.entity.ControllerBulletinsEntity;
-import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
-import org.apache.nifi.web.api.entity.ControllerServiceEntity;
-import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity;
-import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
-import org.apache.nifi.web.api.entity.CurrentUserEntity;
-import org.apache.nifi.web.api.entity.FlowComparisonEntity;
-import org.apache.nifi.web.api.entity.FlowConfigurationEntity;
-import org.apache.nifi.web.api.entity.FlowEntity;
-import org.apache.nifi.web.api.entity.FunnelEntity;
-import org.apache.nifi.web.api.entity.LabelEntity;
-import org.apache.nifi.web.api.entity.PortEntity;
-import org.apache.nifi.web.api.entity.PortStatusEntity;
-import org.apache.nifi.web.api.entity.ProcessGroupEntity;
-import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
-import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
-import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
-import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity;
-import org.apache.nifi.web.api.entity.ProcessorEntity;
-import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
-import org.apache.nifi.web.api.entity.RegistryClientEntity;
-import org.apache.nifi.web.api.entity.RegistryEntity;
-import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
-import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity;
-import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity;
-import org.apache.nifi.web.api.entity.ReportingTaskEntity;
-import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
-import org.apache.nifi.web.api.entity.SnippetEntity;
-import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity;
-import org.apache.nifi.web.api.entity.StatusHistoryEntity;
-import org.apache.nifi.web.api.entity.TemplateEntity;
-import org.apache.nifi.web.api.entity.TenantEntity;
-import org.apache.nifi.web.api.entity.UserEntity;
-import org.apache.nifi.web.api.entity.UserGroupEntity;
-import org.apache.nifi.web.api.entity.VariableEntity;
-import org.apache.nifi.web.api.entity.VariableRegistryEntity;
-import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
-import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
-import org.apache.nifi.web.api.entity.VersionedFlowEntity;
-import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
-import org.apache.nifi.web.controller.ControllerFacade;
-import org.apache.nifi.web.dao.AccessPolicyDAO;
-import org.apache.nifi.web.dao.ConnectionDAO;
-import org.apache.nifi.web.dao.ControllerServiceDAO;
-import org.apache.nifi.web.dao.FunnelDAO;
-import org.apache.nifi.web.dao.LabelDAO;
-import org.apache.nifi.web.dao.PortDAO;
-import org.apache.nifi.web.dao.ProcessGroupDAO;
-import org.apache.nifi.web.dao.ProcessorDAO;
-import org.apache.nifi.web.dao.RegistryDAO;
-import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
-import org.apache.nifi.web.dao.ReportingTaskDAO;
-import org.apache.nifi.web.dao.SnippetDAO;
-import org.apache.nifi.web.dao.TemplateDAO;
-import org.apache.nifi.web.dao.UserDAO;
-import org.apache.nifi.web.dao.UserGroupDAO;
-import org.apache.nifi.web.revision.DeleteRevisionTask;
-import org.apache.nifi.web.revision.ExpiredRevisionClaimException;
-import org.apache.nifi.web.revision.RevisionClaim;
-import org.apache.nifi.web.revision.RevisionManager;
-import org.apache.nifi.web.revision.RevisionUpdate;
-import org.apache.nifi.web.revision.StandardRevisionClaim;
-import org.apache.nifi.web.revision.StandardRevisionUpdate;
-import org.apache.nifi.web.revision.UpdateRevisionTask;
-import org.apache.nifi.web.util.SnippetUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * Implementation of NiFiServiceFacade that performs revision checking.
- */
-public class StandardNiFiServiceFacade implements NiFiServiceFacade {
- private static final Logger logger = LoggerFactory.getLogger(StandardNiFiServiceFacade.class);
- private static final int VALIDATION_WAIT_MILLIS = 50;
-
- // nifi core components
- private ControllerFacade controllerFacade;
- private SnippetUtils snippetUtils;
-
- // revision manager
- private RevisionManager revisionManager;
- private BulletinRepository bulletinRepository;
-
- // data access objects
- private ProcessorDAO processorDAO;
- private ProcessGroupDAO processGroupDAO;
- private RemoteProcessGroupDAO remoteProcessGroupDAO;
- private LabelDAO labelDAO;
- private FunnelDAO funnelDAO;
- private SnippetDAO snippetDAO;
- private PortDAO inputPortDAO;
- private PortDAO outputPortDAO;
- private ConnectionDAO connectionDAO;
- private ControllerServiceDAO controllerServiceDAO;
- private ReportingTaskDAO reportingTaskDAO;
- private TemplateDAO templateDAO;
- private UserDAO userDAO;
- private UserGroupDAO userGroupDAO;
- private AccessPolicyDAO accessPolicyDAO;
- private RegistryDAO registryDAO;
- private ClusterCoordinator clusterCoordinator;
- private HeartbeatMonitor heartbeatMonitor;
- private LeaderElectionManager leaderElectionManager;
-
- // administrative services
- private AuditService auditService;
-
- // flow registry
- private FlowRegistryClient flowRegistryClient;
-
- // properties
- private NiFiProperties properties;
- private DtoFactory dtoFactory;
- private EntityFactory entityFactory;
-
- private Authorizer authorizer;
-
- private AuthorizableLookup authorizableLookup;
-
- // -----------------------------------------
- // Synchronization methods
- // -----------------------------------------
- @Override
- public void authorizeAccess(final AuthorizeAccess authorizeAccess) {
- authorizeAccess.authorize(authorizableLookup);
- }
-
- @Override
- public void verifyRevision(final Revision revision, final NiFiUser user) {
- final Revision curRevision = revisionManager.getRevision(revision.getComponentId());
- if (revision.equals(curRevision)) {
- return;
- }
-
- throw new InvalidRevisionException(revision + " is not the most up-to-date revision. This component appears to have been modified");
- }
-
- @Override
- public void verifyRevisions(final Set<Revision> revisions, final NiFiUser user) {
- for (final Revision revision : revisions) {
- verifyRevision(revision, user);
- }
- }
-
- @Override
- public Set<Revision> getRevisionsFromGroup(final String groupId, final Function<ProcessGroup, Set<String>> getComponents) {
- final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
- final Set<String> componentIds = getComponents.apply(group);
- return componentIds.stream().map(id -> revisionManager.getRevision(id)).collect(Collectors.toSet());
- }
-
- @Override
- public Set<Revision> getRevisionsFromSnippet(final String snippetId) {
- final Snippet snippet = snippetDAO.getSnippet(snippetId);
- final Set<String> componentIds = new HashSet<>();
- componentIds.addAll(snippet.getProcessors().keySet());
- componentIds.addAll(snippet.getFunnels().keySet());
- componentIds.addAll(snippet.getLabels().keySet());
- componentIds.addAll(snippet.getConnections().keySet());
- componentIds.addAll(snippet.getInputPorts().keySet());
- componentIds.addAll(snippet.getOutputPorts().keySet());
- componentIds.addAll(snippet.getProcessGroups().keySet());
- componentIds.addAll(snippet.getRemoteProcessGroups().keySet());
- return componentIds.stream().map(id -> revisionManager.getRevision(id)).collect(Collectors.toSet());
- }
-
- // -----------------------------------------
- // Verification Operations
- // -----------------------------------------
-
- @Override
- public void verifyListQueue(final String connectionId) {
- connectionDAO.verifyList(connectionId);
- }
-
- @Override
- public void verifyCreateConnection(final String groupId, final ConnectionDTO connectionDTO) {
- connectionDAO.verifyCreate(groupId, connectionDTO);
- }
-
- @Override
- public void verifyUpdateConnection(final ConnectionDTO connectionDTO) {
- // if connection does not exist, then the update request is likely creating it
- // so we don't verify since it will fail
- if (connectionDAO.hasConnection(connectionDTO.getId())) {
- connectionDAO.verifyUpdate(connectionDTO);
- } else {
- connectionDAO.verifyCreate(connectionDTO.getParentGroupId(), connectionDTO);
- }
- }
-
- @Override
- public void verifyDeleteConnection(final String connectionId) {
- connectionDAO.verifyDelete(connectionId);
- }
-
- @Override
- public void verifyDeleteFunnel(final String funnelId) {
- funnelDAO.verifyDelete(funnelId);
- }
-
- @Override
- public void verifyUpdateInputPort(final PortDTO inputPortDTO) {
- // if connection does not exist, then the update request is likely creating it
- // so we don't verify since it will fail
- if (inputPortDAO.hasPort(inputPortDTO.getId())) {
- inputPortDAO.verifyUpdate(inputPortDTO);
- }
- }
-
- @Override
- public void verifyDeleteInputPort(final String inputPortId) {
- inputPortDAO.verifyDelete(inputPortId);
- }
-
- @Override
- public void verifyUpdateOutputPort(final PortDTO outputPortDTO) {
- // if connection does not exist, then the update request is likely creating it
- // so we don't verify since it will fail
- if (outputPortDAO.hasPort(outputPortDTO.getId())) {
- outputPortDAO.verifyUpdate(outputPortDTO);
- }
- }
-
- @Override
- public void verifyDeleteOutputPort(final String outputPortId) {
- outputPortDAO.verifyDelete(outputPortId);
- }
-
- @Override
- public void verifyCreateProcessor(ProcessorDTO processorDTO) {
- processorDAO.verifyCreate(processorDTO);
- }
-
- @Override
- public void verifyUpdateProcessor(final ProcessorDTO processorDTO) {
- // if group does not exist, then the update request is likely creating it
- // so we don't verify since it will fail
- if (processorDAO.hasProcessor(processorDTO.getId())) {
- processorDAO.verifyUpdate(processorDTO);
- } else {
- verifyCreateProcessor(processorDTO);
- }
- }
-
- @Override
- public void verifyDeleteProcessor(final String processorId) {
- processorDAO.verifyDelete(processorId);
- }
-
- @Override
- public void verifyScheduleComponents(final String groupId, final ScheduledState state, final Set<String> componentIds) {
- processGroupDAO.verifyScheduleComponents(groupId, state, componentIds);
- }
-
- @Override
- public void verifyEnableComponents(String processGroupId, ScheduledState state, Set<String> componentIds) {
- processGroupDAO.verifyEnableComponents(processGroupId, state, componentIds);
- }
-
- @Override
- public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Collection<String> serviceIds) {
- processGroupDAO.verifyActivateControllerServices(state, serviceIds);
- }
-
- @Override
- public void verifyDeleteProcessGroup(final String groupId) {
- processGroupDAO.verifyDelete(groupId);
- }
-
- @Override
- public void verifyUpdateRemoteProcessGroup(final RemoteProcessGroupDTO remoteProcessGroupDTO) {
- // if remote group does not exist, then the update request is likely creating it
- // so we don't verify since it will fail
- if (remoteProcessGroupDAO.hasRemoteProcessGroup(remoteProcessGroupDTO.getId())) {
- remoteProcessGroupDAO.verifyUpdate(remoteProcessGroupDTO);
- }
- }
-
- @Override
- public void verifyUpdateRemoteProcessGroupInputPort(final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
- remoteProcessGroupDAO.verifyUpdateInputPort(remoteProcessGroupId, remoteProcessGroupPortDTO);
- }
-
- @Override
- public void verifyUpdateRemoteProcessGroupOutputPort(final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
- remoteProcessGroupDAO.verifyUpdateOutputPort(remoteProcessGroupId, remoteProcessGroupPortDTO);
- }
-
- @Override
- public void verifyDeleteRemoteProcessGroup(final String remoteProcessGroupId) {
- remoteProcessGroupDAO.verifyDelete(remoteProcessGroupId);
- }
-
- @Override
- public void verifyCreateControllerService(ControllerServiceDTO controllerServiceDTO) {
- controllerServiceDAO.verifyCreate(controllerServiceDTO);
- }
-
- @Override
- public void verifyUpdateControllerService(final ControllerServiceDTO controllerServiceDTO) {
- // if service does not exist, then the update request is likely creating it
- // so we don't verify since it will fail
- if (controllerServiceDAO.hasControllerService(controllerServiceDTO.getId())) {
- controllerServiceDAO.verifyUpdate(controllerServiceDTO);
- } else {
- verifyCreateControllerService(controllerServiceDTO);
- }
- }
-
- @Override
- public void verifyUpdateControllerServiceReferencingComponents(final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) {
- controllerServiceDAO.verifyUpdateReferencingComponents(controllerServiceId, scheduledState, controllerServiceState);
- }
-
- @Override
- public void verifyDeleteControllerService(final String controllerServiceId) {
- controllerServiceDAO.verifyDelete(controllerServiceId);
- }
-
- @Override
- public void verifyCreateReportingTask(ReportingTaskDTO reportingTaskDTO) {
- reportingTaskDAO.verifyCreate(reportingTaskDTO);
- }
-
- @Override
- public void verifyUpdateReportingTask(final ReportingTaskDTO reportingTaskDTO) {
- // if tasks does not exist, then the update request is likely creating it
- // so we don't verify since it will fail
- if (reportingTaskDAO.hasReportingTask(reportingTaskDTO.getId())) {
- reportingTaskDAO.verifyUpdate(reportingTaskDTO);
- } else {
- verifyCreateReportingTask(reportingTaskDTO);
- }
- }
-
- @Override
- public void verifyDeleteReportingTask(final String reportingTaskId) {
- reportingTaskDAO.verifyDelete(reportingTaskId);
- }
-
- // -----------------------------------------
- // Write Operations
- // -----------------------------------------
-
- @Override
- public AccessPolicyEntity updateAccessPolicy(final Revision revision, final AccessPolicyDTO accessPolicyDTO) {
- final Authorizable authorizable = authorizableLookup.getAccessPolicyById(accessPolicyDTO.getId());
- final RevisionUpdate<AccessPolicyDTO> snapshot = updateComponent(revision,
- authorizable,
- () -> accessPolicyDAO.updateAccessPolicy(accessPolicyDTO),
- accessPolicy -> {
- final Set<TenantEntity> users = accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet());
- final Set<TenantEntity> userGroups = accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet());
- final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource());
- return dtoFactory.createAccessPolicyDto(accessPolicy, userGroups, users, componentReference);
- });
-
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizable);
- return entityFactory.createAccessPolicyEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
- }
-
- @Override
- public UserEntity updateUser(final Revision revision, final UserDTO userDTO) {
- final Authorizable usersAuthorizable = authorizableLookup.getTenant();
- final Set<Group> groups = userGroupDAO.getUserGroupsForUser(userDTO.getId());
- final Set<AccessPolicy> policies = userGroupDAO.getAccessPoliciesForUser(userDTO.getId());
- final RevisionUpdate<UserDTO> snapshot = updateComponent(revision,
- usersAuthorizable,
- () -> userDAO.updateUser(userDTO),
- user -> {
- final Set<TenantEntity> tenantEntities = groups.stream().map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet());
- final Set<AccessPolicySummaryEntity> policyEntities = policies.stream().map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
- return dtoFactory.createUserDto(user, tenantEntities, policyEntities);
- });
-
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(usersAuthorizable);
- return entityFactory.createUserEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
- }
-
- @Override
- public UserGroupEntity updateUserGroup(final Revision revision, final UserGroupDTO userGroupDTO) {
- final Authorizable userGroupsAuthorizable = authorizableLookup.getTenant();
- final Set<AccessPolicy> policies = userGroupDAO.getAccessPoliciesForUserGroup(userGroupDTO.getId());
- final RevisionUpdate<UserGroupDTO> snapshot = updateComponent(revision,
- userGroupsAuthorizable,
- () -> userGroupDAO.updateUserGroup(userGroupDTO),
- userGroup -> {
- final Set<TenantEntity> tenantEntities = userGroup.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet());
- final Set<AccessPolicySummaryEntity> policyEntities = policies.stream().map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
- return dtoFactory.createUserGroupDto(userGroup, tenantEntities, policyEntities);
- }
- );
-
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(userGroupsAuthorizable);
- return entityFactory.createUserGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
- }
-
- @Override
- public ConnectionEntity updateConnection(final Revision revision, final ConnectionDTO connectionDTO) {
- final Connection connectionNode = connectionDAO.getConnection(connectionDTO.getId());
-
- final RevisionUpdate<ConnectionDTO> snapshot = updateComponent(
- revision,
- connectionNode,
- () -> connectionDAO.updateConnection(connectionDTO),
- connection -> dtoFactory.createConnectionDto(connection));
-
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connectionNode);
- final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionNode.getIdentifier()));
- return entityFactory.createConnectionEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status);
- }
-
- @Override
- public ProcessorEntity updateProcessor(final Revision revision, final ProcessorDTO processorDTO) {
- // get the component, ensure we have access to it, and perform the update request
- final ProcessorNode processorNode = processorDAO.getProcessor(processorDTO.getId());
- final RevisionUpdate<ProcessorDTO> snapshot = updateComponent(revision,
- processorNode,
- () -> processorDAO.updateProcessor(processorDTO),
- proc -> {
- awaitValidationCompletion(proc);
- return dtoFactory.createProcessorDto(proc);
- });
-
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processorNode);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processorNode));
- final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processorNode.getIdentifier()));
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processorNode.getIdentifier()));
- final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
- return entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
- }
-
- private void awaitValidationCompletion(final ComponentNode component) {
- component.getValidationStatus(VALIDATION_WAIT_MILLIS, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public LabelEntity updateLabel(final Revision revision, final LabelDTO labelDTO) {
- final Label labelNode = labelDAO.getLabel(labelDTO.getId());
- final RevisionUpdate<LabelDTO> snapshot = updateComponent(revision,
- labelNode,
- () -> labelDAO.updateLabel(labelDTO),
- label -> dtoFactory.createLabelDto(label));
-
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(labelNode);
- return entityFactory.createLabelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
- }
-
- @Override
- public FunnelEntity updateFunnel(final Revision revision, final FunnelDTO funnelDTO) {
- final Funnel funnelNode = funnelDAO.getFunnel(funnelDTO.getId());
- final RevisionUpdate<FunnelDTO> snapshot = updateComponent(revision,
- funnelNode,
- () -> funnelDAO.updateFunnel(funnelDTO),
- funnel -> dtoFactory.createFunnelDto(funnel));
-
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnelNode);
- return entityFactory.createFunnelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
- }
-
-
- /**
- * Updates a component with the given revision, using the provided supplier to call
- * into the appropriate DAO and the provided function to convert the component into a DTO.
- *
- * @param revision the current revision
- * @param daoUpdate a Supplier that will update the component via the appropriate DAO
- * @param dtoCreation a Function to convert a component into a dao
- * @param <D> the DTO Type of the updated component
- * @param <C> the Component Type of the updated component
- * @return A RevisionUpdate that represents the new configuration
- */
- private <D, C> RevisionUpdate<D> updateComponent(final Revision revision, final Authorizable authorizable, final Supplier<C> daoUpdate, final Function<C, D> dtoCreation) {
- try {
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
-
- final RevisionUpdate<D> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(revision), user, new UpdateRevisionTask<D>() {
- @Override
- public RevisionUpdate<D> update() {
- // get the updated component
- final C component = daoUpdate.get();
-
- // save updated controller
- controllerFacade.save();
-
- final D dto = dtoCreation.apply(component);
-
- final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId());
- final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity());
- return new StandardRevisionUpdate<>(dto, lastModification);
- }
- });
-
- return updatedComponent;
- } catch (final ExpiredRevisionClaimException erce) {
- throw new InvalidRevisionException("Failed to update component " + authorizable, erce);
- }
- }
-
-
- @Override
- public void verifyUpdateSnippet(final SnippetDTO snippetDto, final Set<String> affectedComponentIds) {
- // if snippet does not exist, then the update request is likely creating it
- // so we don't verify since it will fail
- if (snippetDAO.hasSnippet(snippetDto.getId())) {
- snippetDAO.verifyUpdateSnippetComponent(snippetDto);
- }
- }
-
- @Override
- public SnippetEntity updateSnippet(final Set<Revision> revisions, final SnippetDTO snippetDto) {
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
- final RevisionClaim revisionClaim = new StandardRevisionClaim(revisions);
-
- final RevisionUpdate<SnippetDTO> snapshot;
- try {
- snapshot = revisionManager.updateRevision(revisionClaim, user, new UpdateRevisionTask<SnippetDTO>() {
- @Override
- public RevisionUpdate<SnippetDTO> update() {
- // get the updated component
- final Snippet snippet = snippetDAO.updateSnippetComponents(snippetDto);
-
- // drop the snippet
- snippetDAO.dropSnippet(snippet.getId());
-
- // save updated controller
- controllerFacade.save();
-
- // increment the revisions
- final Set<Revision> updatedRevisions = revisions.stream().map(revision -> {
- final Revision currentRevision = revisionManager.getRevision(revision.getComponentId());
- return currentRevision.incrementRevision(revision.getClientId());
- }).collect(Collectors.toSet());
-
- final SnippetDTO dto = dtoFactory.createSnippetDto(snippet);
- return new StandardRevisionUpdate<>(dto, null, updatedRevisions);
- }
- });
- } catch (final ExpiredRevisionClaimException e) {
- throw new InvalidRevisionException("Failed to update Snippet", e);
- }
-
- return entityFactory.createSnippetEntity(snapshot.getComponent());
- }
-
- @Override
- public PortEntity updateInputPort(final Revision revision, final PortDTO inputPortDTO) {
- final Port inputPortNode = inputPortDAO.getPort(inputPortDTO.getId());
- final RevisionUpdate<PortDTO> snapshot = updateComponent(revision,
- inputPortNode,
- () -> inputPortDAO.updatePort(inputPortDTO),
- port -> dtoFactory.createPortDto(port));
-
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(inputPortNode);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(inputPortNode));
- final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(inputPortNode.getIdentifier()));
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(inputPortNode.getIdentifier()));
- final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
- return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
- }
-
- @Override
- public PortEntity updateOutputPort(final Revision revision, final PortDTO outputPortDTO) {
- final Port outputPortNode = outputPortDAO.getPort(outputPortDTO.getId());
- final RevisionUpdate<PortDTO> snapshot = updateComponent(revision,
- outputPortNode,
- () -> outputPortDAO.updatePort(outputPortDTO),
- port -> dtoFactory.createPortDto(port));
-
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(outputPortNode);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(outputPortNode), NiFiUserUtils.getNiFiUser());
- final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(outputPortNode.getIdentifier()));
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(outputPortNode.getIdentifier()));
- final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
- return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
- }
-
- @Override
- public RemoteProcessGroupEntity updateRemoteProcessGroup(final Revision revision, final RemoteProcessGroupDTO remoteProcessGroupDTO) {
- final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId());
- final RevisionUpdate<RemoteProcessGroupDTO> snapshot = updateComponent(
- revision,
- remoteProcessGroupNode,
- () -> remoteProcessGroupDAO.updateRemoteProcessGroup(remoteProcessGroupDTO),
- remoteProcessGroup -> dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));
-
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode));
- final RevisionDTO updateRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
- final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroupNode,
- controllerFacade.getRemoteProcessGroupStatus(remoteProcessGroupNode.getIdentifier()));
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(remoteProcessGroupNode.getIdentifier()));
- final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
- return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), updateRevision, permissions, operatePermissions, status, bulletinEntities);
- }
-
- @Override
- public RemoteProcessGroupPortEntity updateRemoteProcessGroupInputPort(
- final Revision revision, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
-
- final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupPortDTO.getGroupId());
- final RevisionUpdate<RemoteProcessGroupPortDTO> snapshot = updateComponent(
- revision,
- remoteProcessGroupNode,
- () -> remoteProcessGroupDAO.updateRemoteProcessGroupInputPort(remoteProcessGroupId, remoteProcessGroupPortDTO),
- remoteGroupPort -> dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort));
-
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode));
- final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
- return entityFactory.createRemoteProcessGroupPortEntity(snapshot.getComponent(), updatedRevision, permissions, operatePermissions);
- }
-
- @Override
- public RemoteProcessGroupPortEntity updateRemoteProcessGroupOutputPort(
- final Revision revision, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
-
- final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupPortDTO.getGroupId());
- final RevisionUpdate<RemoteProcessGroupPortDTO> snapshot = updateComponent(
- revision,
- remoteProcessGroupNode,
- () -> remoteProcessGroupDAO.updateRemoteProcessGroupOutputPort(remoteProcessGroupId, remoteProcessGroupPortDTO),
- remoteGroupPort -> dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort));
-
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode));
- final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
- return entityFactory.createRemoteProcessGroupPortEntity(snapshot.getComponent(), updatedRevision, permissions, operatePermissions);
- }
-
- @Override
- public Set<AffectedComponentDTO> getActiveComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO variableRegistryDto) {
- final ProcessGroup group = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId());
- if (group == null) {
- throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistryDto.getProcessGroupId());
- }
-
- final Map<String, String> variableMap = new HashMap<>();
- variableRegistryDto.getVariables().stream() // have to use forEach here instead of using Collectors.toMap because value may be null
- .map(VariableEntity::getVariable)
- .forEach(var -> variableMap.put(var.getName(), var.getValue()));
-
- final Set<AffectedComponentDTO> affectedComponentDtos = new HashSet<>();
-
- final Set<String> updatedVariableNames = getUpdatedVariables(group, variableMap);
- for (final String variableName : updatedVariableNames) {
- final Set<ComponentNode> affectedComponents = group.getComponentsAffectedByVariable(variableName);
-
- for (final ComponentNode component : affectedComponents) {
- if (component instanceof ProcessorNode) {
- final ProcessorNode procNode = (ProcessorNode) component;
- if (procNode.isRunning()) {
- affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(procNode));
- }
- } else if (component instanceof ControllerServiceNode) {
- final ControllerServiceNode serviceNode = (ControllerServiceNode) component;
- if (serviceNode.isActive()) {
- affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(serviceNode));
- }
- } else {
- throw new RuntimeException("Found unexpected type of Component [" + component.getCanonicalClassName() + "] dependending on variable");
- }
- }
- }
-
- return affectedComponentDtos;
- }
-
- @Override
- public Set<AffectedComponentEntity> getComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO variableRegistryDto) {
- final ProcessGroup group = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId());
- if (group == null) {
- throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistryDto.getProcessGroupId());
- }
-
- final Map<String, String> variableMap = new HashMap<>();
- variableRegistryDto.getVariables().stream() // have to use forEach here instead of using Collectors.toMap because value may be null
- .map(VariableEntity::getVariable)
- .forEach(var -> variableMap.put(var.getName(), var.getValue()));
-
- final Set<AffectedComponentEntity> affectedComponentEntities = new HashSet<>();
-
- final Set<String> updatedVariableNames = getUpdatedVariables(group, variableMap);
- for (final String variableName : updatedVariableNames) {
- final Set<ComponentNode> affectedComponents = group.getComponentsAffectedByVariable(variableName);
- affectedComponentEntities.addAll(dtoFactory.createAffectedComponentEntities(affectedComponents, revisionManager));
- }
-
- return affectedComponentEntities;
- }
-
- private Set<String> getUpdatedVariables(final ProcessGroup group, final Map<String, String> newVariableValues) {
- final Set<String> updatedVariableNames = new HashSet<>();
-
- final ComponentVariableRegistry registry = group.getVariableRegistry();
- for (final Map.Entry<String, String> entry : newVariableValues.entrySet()) {
- final String varName = entry.getKey();
- final String newValue = entry.getValue();
-
- final String curValue = registry.getVariableValue(varName);
- if (!Objects.equals(newValue, curValue)) {
- updatedVariableNames.add(varName);
- }
- }
-
- return updatedVariableNames;
- }
-
-
- @Override
- public VariableRegistryEntity updateVariableRegistry(Revision revision, VariableRegistryDTO variableRegistryDto) {
- final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId());
- final RevisionUpdate<VariableRegistryDTO> snapshot = updateComponent(revision,
- processGroupNode,
- () -> processGroupDAO.updateVariableRegistry(variableRegistryDto),
- processGroup -> dtoFactory.createVariableRegistryDto(processGroup, revisionManager));
-
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode);
- final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
- return entityFactory.createVariableRegistryEntity(snapshot.getComponent(), updatedRevision, permissions);
- }
-
-
- @Override
- public ProcessGroupEntity updateProcessGroup(final Revision revision, final ProcessGroupDTO processGroupDTO) {
- final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(processGroupDTO.getId());
- final RevisionUpdate<ProcessGroupDTO> snapshot = updateComponent(revision,
- processGroupNode,
- () -> processGroupDAO.updateProcessGroup(processGroupDTO),
- processGroup -> dtoFactory.createProcessGroupDto(processGroup));
-
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode);
- final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
- final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroupNode.getIdentifier()));
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroupNode.getIdentifier()));
- final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
- return entityFactory.createProcessGroupEntity(snapshot.getComponent(), updatedRevision, permissions, status, bulletinEntities);
- }
-
- @Override
- public void verifyUpdateProcessGroup(ProcessGroupDTO processGroupDTO) {
- if (processGroupDAO.hasProcessGroup(processGroupDTO.getId())) {
- processGroupDAO.verifyUpdate(processGroupDTO);
- }
- }
-
- @Override
- public ScheduleComponentsEntity enableComponents(String processGroupId, ScheduledState state, Map<String, Revision> componentRevisions) {
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
-
- final RevisionUpdate<ScheduleComponentsEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new
- UpdateRevisionTask<ScheduleComponentsEntity>() {
- @Override
- public RevisionUpdate<ScheduleComponentsEntity> update() {
- // schedule the components
- processGroupDAO.enableComponents(processGroupId, state, componentRevisions.keySet());
-
- // update the revisions
- final Map<String, Revision> updatedRevisions = new HashMap<>();
- for (final Revision revision : componentRevisions.values()) {
- final Revision currentRevision = revisionManager.getRevision(revision.getComponentId());
- updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId()));
- }
-
- // save
- controllerFacade.save();
-
- // gather details for response
- final ScheduleComponentsEntity entity = new ScheduleComponentsEntity();
- entity.setId(processGroupId);
- entity.setState(state.name());
- return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values()));
- }
- });
-
- return updatedComponent.getComponent();
- }
-
- @Override
- public ScheduleComponentsEntity scheduleComponents(final String processGroupId, final ScheduledState state, final Map<String, Revision> componentRevisions) {
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
- final RevisionUpdate<ScheduleComponentsEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new
- UpdateRevisionTask<ScheduleComponentsEntity>() {
- @Override
- public RevisionUpdate<ScheduleComponentsEntity> update() {
- // schedule the components
- processGroupDAO.scheduleComponents(processGroupId, state, componentRevisions.keySet());
-
- // update the revisions
- final Map<String, Revision> updatedRevisions = new HashMap<>();
- for (final Revision revision : componentRevisions.values()) {
- final Revision currentRevision = revisionManager.getRevision(revision.getComponentId());
- updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId()));
- }
-
- // save
- controllerFacade.save();
-
- // gather details for response
- final ScheduleComponentsEntity entity = new ScheduleComponentsEntity();
- entity.setId(processGroupId);
- entity.setState(state.name());
- return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values()));
- }
- });
-
- return updatedComponent.getComponent();
- }
-
- @Override
- public ActivateControllerServicesEntity activateControllerServices(final String processGroupId, final ControllerServiceState state, final Map<String, Revision> serviceRevisions) {
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
- final RevisionUpdate<ActivateControllerServicesEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(serviceRevisions.values()), user,
- new UpdateRevisionTask<ActivateControllerServicesEntity>() {
- @Override
- public RevisionUpdate<ActivateControllerServicesEntity> update() {
- // schedule the components
- processGroupDAO.activateControllerServices(processGroupId, state, serviceRevisions.keySet());
-
- // update the revisions
- final Map<String, Revision> updatedRevisions = new HashMap<>();
- for (final Revision revision : serviceRevisions.values()) {
- final Revision currentRevision = revisionManager.getRevision(revision.getComponentId());
- updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId()));
- }
-
- // save
- controllerFacade.save();
-
- // gather details for response
- final ActivateControllerServicesEntity entity = new ActivateControllerServicesEntity();
- entity.setId(processGroupId);
- entity.setState(state.name());
- return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values()));
- }
- });
-
- return updatedComponent.getComponent();
- }
-
-
- @Override
- public ControllerConfigurationEntity updateControllerConfiguration(final Revision revision, final ControllerConfigurationDTO controllerConfigurationDTO) {
- final RevisionUpdate<ControllerConfigurationDTO> updatedComponent = updateComponent(
- revision,
- controllerFacade,
- () -> {
- if (controllerConfigurationDTO.getMaxTimerDrivenThreadCount() != null) {
- controllerFacade.setMaxTimerDrivenThreadCount(controllerConfigurationDTO.getMaxTimerDrivenThreadCount());
- }
- if (controllerConfigurationDTO.getMaxEventDrivenThreadCount() != null) {
- controllerFacade.setMaxEventDrivenThreadCount(controllerConfigurationDTO.getMaxEventDrivenThreadCount());
- }
-
- return controllerConfigurationDTO;
- },
- controller -> dtoFactory.createControllerConfigurationDto(controllerFacade));
-
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerFacade);
- final RevisionDTO updateRevision = dtoFactory.createRevisionDTO(updatedComponent.getLastModification());
- return entityFactory.createControllerConfigurationEntity(updatedComponent.getComponent(), updateRevision, permissions);
- }
-
-
- @Override
- public NodeDTO updateNode(final NodeDTO nodeDTO) {
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
- if (user == null) {
- throw new WebApplicationException(new Throwable("Unable to access details for current user."));
- }
- final String userDn = user.getIdentity();
-
- final NodeIdentifier nodeId = clusterCoordinator.getNodeIdentifier(nodeDTO.getNodeId());
- if (nodeId == null) {
- throw new UnknownNodeException("No node exists with ID " + nodeDTO.getNodeId());
- }
-
-
- if (NodeConnectionState.CONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) {
- clusterCoordinator.requestNodeConnect(nodeId, userDn);
- } else if (NodeConnectionState.OFFLOADING.name().equalsIgnoreCase(nodeDTO.getStatus())) {
- clusterCoordinator.requestNodeOffload(nodeId, OffloadCode.OFFLOADED,
- "User " + userDn + " requested that node be offloaded");
- } else if (NodeConnectionState.DISCONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) {
- clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.USER_DISCONNECTED,
- "User " + userDn + " requested that node be disconnected from cluster");
- }
-
- return getNode(nodeId);
- }
-
- @Override
- public CounterDTO updateCounter(final String counterId) {
- return dtoFactory.createCounterDto(controllerFacade.resetCounter(counterId));
- }
-
- @Override
- public void verifyCanClearProcessorState(final String processorId) {
- processorDAO.verifyClearState(processorId);
- }
-
- @Override
- public void clearProcessorState(final String processorId) {
- processorDAO.clearState(processorId);
- }
-
- @Override
- public void verifyCanClearControllerServiceState(final String controllerServiceId) {
- controllerServiceDAO.verifyClearState(controllerServiceId);
- }
-
- @Override
- public void clearControllerServiceState(final String controllerServiceId) {
- controllerServiceDAO.clearState(controllerServiceId);
- }
-
- @Override
- public void verifyCanClearReportingTaskState(final String reportingTaskId) {
- reportingTaskDAO.verifyClearState(reportingTaskId);
- }
-
- @Override
- public void clearReportingTaskState(final String reportingTaskId) {
- reportingTaskDAO.clearState(reportingTaskId);
- }
-
- @Override
- public ConnectionEntity deleteConnection(final Revision revision, final String connectionId) {
- final Connection connection = connectionDAO.getConnection(connectionId);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
- final ConnectionDTO snapshot = deleteComponent(
- revision,
- connection.getResource(),
- () -> connectionDAO.deleteConnection(connectionId),
- false, // no policies to remove
- dtoFactory.createConnectionDto(connection));
-
- return entityFactory.createConnectionEntity(snapshot, null, permissions, null);
- }
-
- @Override
- public DropRequestDTO deleteFlowFileDropRequest(final String connectionId, final String dropRequestId) {
- return dtoFactory.createDropRequestDTO(connectionDAO.deleteFlowFileDropRequest(connectionId, dropRequestId));
- }
-
- @Override
- public ListingRequestDTO deleteFlowFileListingRequest(final String connectionId, final String listingRequestId) {
- final Connection connection = connectionDAO.getConnection(connectionId);
- final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.deleteFlowFileListingRequest(connectionId, listingRequestId));
-
- // include whether the source and destination are running
- if (connection.getSource() != null) {
- listRequest.setSourceRunning(connection.getSource().isRunning());
- }
- if (connection.getDestination() != null) {
- listRequest.setDestinationRunning(connection.getDestination().isRunning());
- }
-
- return listRequest;
- }
-
- @Override
- public ProcessorEntity deleteProcessor(final Revision revision, final String processorId) {
- final ProcessorNode processor = processorDAO.getProcessor(processorId);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor));
- final ProcessorDTO snapshot = deleteComponent(
- revision,
- processor.getResource(),
- () -> processorDAO.deleteProcessor(processorId),
- true,
- dtoFactory.createProcessorDto(processor));
-
- return entityFactory.createProcessorEntity(snapshot, null, permissions, operatePermissions, null, null);
- }
-
- @Override
- public ProcessorEntity terminateProcessor(final String processorId) {
- processorDAO.terminate(processorId);
- return getProcessor(processorId);
- }
-
- @Override
- public void verifyTerminateProcessor(final String processorId) {
- processorDAO.verifyTerminate(processorId);
- }
-
- @Override
- public LabelEntity deleteLabel(final Revision revision, final String labelId) {
- final Label label = labelDAO.getLabel(labelId);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label);
- final LabelDTO snapshot = deleteComponent(
- revision,
- label.getResource(),
- () -> labelDAO.deleteLabel(labelId),
- true,
- dtoFactory.createLabelDto(label));
-
- return entityFactory.createLabelEntity(snapshot, null, permissions);
- }
-
- @Override
- public UserEntity deleteUser(final Revision revision, final String userId) {
- final User user = userDAO.getUser(userId);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
- final Set<TenantEntity> userGroups = user != null ? userGroupDAO.getUserGroupsForUser(userId).stream()
- .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()) : null;
- final Set<AccessPolicySummaryEntity> policyEntities = user != null ? userGroupDAO.getAccessPoliciesForUser(userId).stream()
- .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()) : null;
-
- final String resourceIdentifier = ResourceFactory.getTenantResource().getIdentifier() + "/" + userId;
- final UserDTO snapshot = deleteComponent(
- revision,
- new Resource() {
- @Override
- public String getIdentifier() {
- return resourceIdentifier;
- }
-
- @Override
- public String getName() {
- return resourceIdentifier;
- }
-
- @Override
- public String getSafeDescription() {
- return "User " + userId;
- }
- },
- () -> userDAO.deleteUser(userId),
- false, // no user specific policies to remove
- dtoFactory.createUserDto(user, userGroups, policyEntities));
-
- return entityFactory.createUserEntity(snapshot, null, permissions);
- }
-
- @Override
- public UserGroupEntity deleteUserGroup(final Revision revision, final String userGroupId) {
- final Group userGroup = userGroupDAO.getUserGroup(userGroupId);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
- final Set<TenantEntity> users = userGroup != null ? userGroup.getUsers().stream()
- .map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()) : null;
- final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(userGroup.getIdentifier()).stream()
- .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
-
- final String resourceIdentifier = ResourceFactory.getTenantResource().getIdentifier() + "/" + userGroupId;
- final UserGroupDTO snapshot = deleteComponent(
- revision,
- new Resource() {
- @Override
- public String getIdentifier() {
- return resourceIdentifier;
- }
-
- @Override
- public String getName() {
- return resourceIdentifier;
- }
-
- @Override
- public String getSafeDescription() {
- return "User Group " + userGroupId;
- }
- },
- () -> userGroupDAO.deleteUserGroup(userGroupId),
- false, // no user group specific policies to remove
- dtoFactory.createUserGroupDto(userGroup, users, policyEntities));
-
- return entityFactory.createUserGroupEntity(snapshot, null, permissions);
- }
-
- @Override
- public AccessPolicyEntity deleteAccessPolicy(final Revision revision, final String accessPolicyId) {
- final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(accessPolicyId);
- final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource());
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicyId));
- final Set<TenantEntity> userGroups = accessPolicy != null ? accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()) : null;
- final Set<TenantEntity> users = accessPolicy != null ? accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()) : null;
- final AccessPolicyDTO snapshot = deleteComponent(
- revision,
- new Resource() {
- @Override
- public String getIdentifier() {
- return accessPolicy.getResource();
- }
-
- @Override
- public String getName() {
- return accessPolicy.getResource();
- }
-
- @Override
- public String getSafeDescription() {
- return "Policy " + accessPolicyId;
- }
- },
- () -> accessPolicyDAO.deleteAccessPolicy(accessPolicyId),
- false, // no need to clean up any policies as it's already been removed above
- dtoFactory.createAccessPolicyDto(accessPolicy, userGroups, users, componentReference));
-
- return entityFactory.createAccessPolicyEntity(snapshot, null, permissions);
- }
-
- @Override
- public FunnelEntity deleteFunnel(final Revision revision, final String funnelId) {
- final Funnel funnel = funnelDAO.getFunnel(funnelId);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnel);
- final FunnelDTO snapshot = deleteComponent(
- revision,
- funnel.getResource(),
- () -> funnelDAO.deleteFunnel(funnelId),
- true,
- dtoFactory.createFunnelDto(funnel));
-
- return entityFactory.createFunnelEntity(snapshot, null, permissions);
- }
-
- /**
- * Deletes a component using the Optimistic Locking Manager
- *
- * @param revision the current revision
- * @param resource the resource being removed
- * @param deleteAction the action that deletes the component via the appropriate DAO object
- * @param cleanUpPolicies whether or not the policies for this resource should be removed as well - not necessary when there are
- * no component specific policies or if the policies of the component are inherited
- * @return a dto that represents the new configuration
- */
- private <D, C> D deleteComponent(final Revision revision, final Resource resource, final Runnable deleteAction, final boolean cleanUpPolicies, final D dto) {
- final RevisionClaim claim = new StandardRevisionClaim(revision);
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
-
- return revisionManager.deleteRevision(claim, user, new DeleteRevisionTask<D>() {
- @Override
- public D performTask() {
- logger.debug("Attempting to delete component {} with claim {}", resource.getIdentifier(), claim);
-
- // run the delete action
- deleteAction.run();
-
- // save the flow
- controllerFacade.save();
- logger.debug("Deletion of component {} was successful", resource.getIdentifier());
-
- if (cleanUpPolicies) {
- cleanUpPolicies(resource);
- }
-
- return dto;
- }
- });
- }
-
- /**
- * Clean up the policies for the specified component resource.
- *
- * @param componentResource the resource for the component
- */
- private void cleanUpPolicies(final Resource componentResource) {
- // ensure the authorizer supports configuration
- if (accessPolicyDAO.supportsConfigurableAuthorizer()) {
- final List<Resource> resources = new ArrayList<>();
- resources.add(componentResource);
- resources.add(ResourceFactory.getDataResource(componentResource));
- resources.add(ResourceFactory.getProvenanceDataResource(componentResource));
- resources.add(ResourceFactory.getDataTransferResource(componentResource));
- resources.add(ResourceFactory.getPolicyResource(componentResource));
-
- for (final Resource resource : resources) {
- for (final RequestAction action : RequestAction.values()) {
- try {
- // since the component is being deleted, also delete any relevant access policies
- final AccessPolicy readPolicy = accessPolicyDAO.getAccessPolicy(action, resource.getIdentifier());
- if (readPolicy != null) {
- accessPolicyDAO.deleteAccessPolicy(readPolicy.getIdentifier());
- }
- } catch (final Exception e) {
- logger.warn(String.format("Unable to remove access policy for %s %s after component removal.", action, resource.getIdentifier()), e);
- }
- }
- }
- }
- }
-
- @Override
- public void verifyDeleteSnippet(final String snippetId, final Set<String> affectedComponentIds) {
- snippetDAO.verifyDeleteSnippetComponents(snippetId);
- }
-
- @Override
- public SnippetEntity deleteSnippet(final Set<Revision> revisions, final String snippetId) {
- final Snippet snippet = snippetDAO.getSnippet(snippetId);
-
- // grab the resources in the snippet so we can delete the policies afterwards
- final Set<Resource> snippetResources = new HashSet<>();
- snippet.getProcessors().keySet().forEach(id -> snippetResources.add(processorDAO.getProcessor(id).getResource()));
- snippet.getInputPorts().keySet().forEach(id -> snippetResources.add(inputPortDAO.getPort(id).getResource()));
- snippet.getOutputPorts().keySet().forEach(id -> snippetResources.add(outputPortDAO.getPort(id).getResource()));
- snippet.getFunnels().keySet().forEach(id -> snippetResources.add(funnelDAO.getFunnel(id).getResource()));
- snippet.getLabels().keySet().forEach(id -> snippetResources.add(labelDAO.getLabel(id).getResource()));
- snippet.getRemoteProcessGroups().keySet().forEach(id -> snippetResources.add(remoteProcessGroupDAO.getRemoteProcessGroup(id).getResource()));
- snippet.getProcessGroups().keySet().forEach(id -> {
- final ProcessGroup processGroup = processGroupDAO.getProcessGroup(id);
-
- // add the process group
- snippetResources.add(processGroup.getResource());
-
- // add each encapsulated component
- processGroup.findAllProcessors().forEach(processor -> snippetResources.add(processor.getResource()));
- processGroup.findAllInputPorts().forEach(inputPort -> snippetResources.add(inputPort.getResource()));
- processGroup.findAllOutputPorts().forEach(outputPort -> snippetResources.add(outputPort.getResource()));
- processGroup.findAllFunnels().forEach(funnel -> snippetResources.add(funnel.getResource()));
- processGroup.findAllLabels().forEach(label -> snippetResources.add(label.getResource()));
- processGroup.findAllProcessGroups().forEach(childGroup -> snippetResources.add(childGroup.getResource()));
- processGroup.findAllRemoteProcessGroups().forEach(remoteProcessGroup -> snippetResources.add(remoteProcessGroup.getResource()));
- processGroup.findAllTemplates().forEach(template -> snippetResources.add(template.getResource()));
- processGroup.findAllControllerServices().forEach(controllerService -> snippetResources.add(controllerService.getResource()));
- });
-
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
- final RevisionClaim claim = new StandardRevisionClaim(revisions);
- final SnippetDTO dto = revisionManager.deleteRevision(claim, user, new DeleteRevisionTask<SnippetDTO>() {
- @Override
- public SnippetDTO performTask() {
- // delete the components in the snippet
- snippetDAO.deleteSnippetComponents(snippetId);
-
- // drop the snippet
- snippetDAO.dropSnippet(snippetId);
-
- // save
- controllerFacade.save();
-
- // create the dto for the snippet that was just removed
- return dtoFactory.createSnippetDto(snippet);
- }
- });
-
- // clean up component policies
- snippetResources.forEach(resource -> cleanUpPolicies(resource));
-
- return entityFactory.createSnippetEntity(dto);
- }
-
- @Override
- public PortEntity deleteInputPort(final Revision revision, final String inputPortId) {
- final Port port = inputPortDAO.getPort(inputPortId);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port));
- final PortDTO snapshot = deleteComponent(
- revision,
- port.getResource(),
- () -> inputPortDAO.deletePort(inputPortId),
- true,
- dtoFactory.createPortDto(port));
-
- return entityFactory.createPortEntity(snapshot, null, permissions, operatePermissions, null, null);
- }
-
- @Override
- public PortEntity deleteOutputPort(final Revision revision, final String outputPortId) {
- final Port port = outputPortDAO.getPort(outputPortId);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port));
- final PortDTO snapshot = deleteComponent(
- revision,
- port.getResource(),
- () -> outputPortDAO.deletePort(outputPortId),
- true,
- dtoFactory.createPortDto(port));
-
- return entityFactory.createPortEntity(snapshot, null, permissions, operatePermissions, null, null);
- }
-
- @Override
- public ProcessGroupEntity deleteProcessGroup(final Revision revision, final String groupId) {
- final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
-
- // grab the resources in the snippet so we can delete the policies afterwards
- final Set<Resource> groupResources = new HashSet<>();
- processGroup.findAllProcessors().forEach(processor -> groupResources.add(processor.getResource()));
- processGroup.findAllInputPorts().forEach(inputPort -> groupResources.add(inputPort.getResource()));
- processGroup.findAllOutputPorts().forEach(outputPort -> groupResources.add(outputPort.getResource()));
- processGroup.findAllFunnels().forEach(funnel -> groupResources.add(funnel.getResource()));
- processGroup.findAllLabels().forEach(label -> groupResources.add(label.getResource()));
- processGroup.findAllProcessGroups().forEach(childGroup -> groupResources.add(childGroup.getResource()));
- processGroup.findAllRemoteProcessGroups().forEach(remoteProcessGroup -> groupResources.add(remoteProcessGroup.getResource()));
- processGroup.findAllTemplates().forEach(template -> groupResources.add(template.getResource()));
- processGroup.findAllControllerServices().forEach(controllerService -> groupResources.add(controllerService.getResource()));
-
- final ProcessGroupDTO snapshot = deleteComponent(
- revision,
- processGroup.getResource(),
- () -> processGroupDAO.deleteProcessGroup(groupId),
- true,
- dtoFactory.createProcessGroupDto(processGroup));
-
- // delete all applicable component policies
- groupResources.forEach(groupResource -> cleanUpPolicies(groupResource));
-
- return entityFactory.createProcessGroupEntity(snapshot, null, permissions, null, null);
- }
-
- @Override
- public RemoteProcessGroupEntity deleteRemoteProcessGroup(final Revision revision, final String remoteProcessGroupId) {
- final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroup));
- final RemoteProcessGroupDTO snapshot = deleteComponent(
- revision,
- remoteProcessGroup.getResource(),
- () -> remoteProcessGroupDAO.deleteRemoteProcessGroup(remoteProcessGroupId),
- true,
- dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));
-
- return entityFactory.createRemoteProcessGroupEntity(snapshot, null, permissions, operatePermissions, null, null);
- }
-
- @Override
- public void deleteTemplate(final String id) {
- // delete the template and save the flow
- templateDAO.deleteTemplate(id);
- controllerFacade.save();
- }
-
- @Override
- public ConnectionEntity createConnection(final Revision revision, final String groupId, final ConnectionDTO connectionDTO) {
- final RevisionUpdate<ConnectionDTO> snapshot = createComponent(
- revision,
- connectionDTO,
- () -> connectionDAO.createConnection(groupId, connectionDTO),
- connection -> dtoFactory.createConnectionDto(connection));
-
- final Connection connection = connectionDAO.getConnection(connectionDTO.getId());
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
- final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionDTO.getId()));
- return entityFactory.createConnectionEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status);
- }
-
- @Override
- public DropRequestDTO createFlowFileDropRequest(final String connectionId, final String dropRequestId) {
- return dtoFactory.createDropRequestDTO(connectionDAO.createFlowFileDropRequest(connectionId, dropRequestId));
- }
-
- @Override
- public ListingRequestDTO createFlowFileListingRequest(final String connectionId, final String listingRequestId) {
- final Connection connection = connectionDAO.getConnection(connectionId);
- final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.createFlowFileListingRequest(connectionId, listingRequestId));
-
- // include whether the source and destination are running
- if (connection.getSource() != null) {
- listRequest.setSourceRunning(connection.getSource().isRunning());
- }
- if (connection.getDestination() != null) {
- listRequest.setDestinationRunning(connection.getDestination().isRunning());
- }
-
- return listRequest;
- }
-
- @Override
- public ProcessorEntity createProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) {
- final RevisionUpdate<ProcessorDTO> snapshot = createComponent(
- revision,
- processorDTO,
- () -> processorDAO.createProcessor(groupId, processorDTO),
- processor -> {
- awaitValidationCompletion(processor);
- return dtoFactory.createProcessorDto(processor);
- });
-
- final ProcessorNode processor = processorDAO.getProcessor(processorDTO.getId());
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor));
- final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processorDTO.getId()));
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processorDTO.getId()));
- final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
- return entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
- }
-
- @Override
- public LabelEntity createLabel(final Revision revision, final String groupId, final LabelDTO labelDTO) {
- final RevisionUpdate<LabelDTO> snapshot = createComponent(
- revision,
- labelDTO,
- () -> labelDAO.createLabel(groupId, labelDTO),
- label -> dtoFactory.createLabelDto(label));
-
- final Label label = labelDAO.getLabel(labelDTO.getId());
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label);
- return entityFactory.createLabelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
- }
-
- /**
- * Creates a component using the optimistic locking manager.
- *
- * @param componentDto the DTO that will be used to create the component
- * @param daoCreation A Supplier that will create the NiFi Component to use
- * @param dtoCreation a Function that will convert the NiFi Component into a corresponding DTO
- * @param <D> the DTO Type
- * @param <C> the NiFi Component Type
- * @return a RevisionUpdate that represents the updated configuration
- */
- private <D, C> RevisionUpdate<D> createComponent(final Revision revision, final ComponentDTO componentDto, final Supplier<C> daoCreation, final Function<C, D> dtoCreation) {
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
-
- // read lock on the containing group
- // request claim for component to be created... revision already verified (version == 0)
- final RevisionClaim claim = new StandardRevisionClaim(revision);
-
- // update revision through revision manager
- return revisionManager.updateRevision(claim, user, () -> {
- // add the component
- final C component = daoCreation.get();
-
- // save the flow
- controllerFacade.save();
-
- final D dto = dtoCreation.apply(component);
- final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
- return new StandardRevisionUpdate<>(dto, lastMod);
- });
- }
-
- @Override
- public BulletinEntity createBulletin(final BulletinDTO bulletinDTO, final Boolean canRead){
- final Bulletin bulletin = BulletinFactory.createBulletin(bulletinDTO.getCategory(),bulletinDTO.getLevel(),bulletinDTO.getMessage());
- bulletinRepository.addBulletin(bulletin);
- return entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin),canRead);
- }
-
- @Override
- public FunnelEntity createFunnel(final Revision revision, final String groupId, final FunnelDTO funnelDTO) {
- final RevisionUpdate<FunnelDTO> snapshot = createComponent(
- revision,
- funnelDTO,
- () -> funnelDAO.createFunnel(groupId, funnelDTO),
- funnel -> dtoFactory.createFunnelDto(funnel));
-
- final Funnel funnel = funnelDAO.getFunnel(funnelDTO.getId());
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnel);
- return entityFactory.createFunnelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
- }
-
- @Override
- public AccessPolicyEntity createAccessPolicy(final Revision revision, final AccessPolicyDTO accessPolicyDTO) {
- final Authorizable tenantAuthorizable = authorizableLookup.getTenant();
- final String creator = NiFiUserUtils.getNiFiUserIdentity();
-
- final AccessPolicy newAccessPolicy = accessPolicyDAO.createAccessPolicy(accessPolicyDTO);
- final ComponentReferenceEntity componentReference = createComponentReferenceEntity(newAccessPolicy.getResource());
- final AccessPolicyDTO newAccessPolicyDto = dtoFactory.createAccessPolicyDto(newAccessPolicy,
- newAccessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()),
- newAccessPolicy.getUsers().stream().map(userId -> {
- final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userId));
- return entityFactory.createTenantEntity(dtoFactory.createTenantDTO(userDAO.getUser(userId)), userRevision,
- dtoFactory.createPermissionsDto(tenantAuthorizable));
- }).collect(Collectors.toSet()), componentReference);
-
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicyDTO.getId()));
- return entityFactory.createAccessPolicyEntity(newAccessPolicyDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions);
- }
-
- @Override
- public UserEntity createUser(final Revision revision, final UserDTO userDTO) {
- final String creator = NiFiUserUtils.getNiFiUserIdentity();
- final User newUser = userDAO.createUser(userDTO);
- final Set<TenantEntity> tenantEntities = userGroupDAO.getUserGroupsForUser(newUser.getIdentifier()).stream()
- .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet());
- final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUser(newUser.getIdentifier()).stream()
- .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
- final UserDTO newUserDto = dtoFactory.createUserDto(newUser, tenantEntities, policyEntities);
-
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
- return entityFactory.createUserEntity(newUserDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions);
- }
-
- private ComponentReferenceEntity createComponentReferenceEntity(final String resource) {
- ComponentReferenceEntity componentReferenceEntity = null;
- try {
- // get the component authorizable
- Authorizable componentAuthorizable = authorizableLookup.getAuthorizableFromResource(resource);
-
- // if this represents an authorizable whose policy permissions are enforced through the base resource,
- // get the underlying base authorizable for the component reference
- if (componentAuthorizable instanceof EnforcePolicyPermissionsThroughBaseResource) {
- componentAuthorizable = ((EnforcePolicyPermissionsThroughBaseResource) componentAuthorizable).getBaseAuthorizable();
- }
-
- final ComponentReferenceDTO componentReference = dtoFactory.createComponentReferenceDto(componentAuthorizable);
- if (componentReference != null) {
- final PermissionsDTO componentReferencePermissions = dtoFactory.createPermissionsDto(componentAuthorizable);
- final RevisionDTO componentReferenceRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(componentReference.getId()));
- componentReferenceEntity = entityFactory.createComponentReferenceEntity(componentReference, componentReferenceRevision, componentReferencePermissions);
- }
- } catch (final ResourceNotFoundException e) {
- // component not found for the specified resource
- }
-
- return componentReferenceEntity;
- }
-
- private AccessPolicySummaryEntity createAccessPolicySummaryEntity(final AccessPolicy ap) {
- final ComponentReferenceEntity componentReference = createComponentReferenceEntity(ap.getResource());
- final AccessPolicySummaryDTO apSummary = dtoFactory.createAccessPolicySummaryDto(ap, componentReference);
- final PermissionsDTO apPermissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(ap.getIdentifier()));
- final RevisionDTO apRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(ap.getIdentifier()));
- return entityFactory.createAccessPolicySummaryEntity(apSummary, apRevision, apPermissions);
- }
-
- @Override
- public UserGroupEntity createUserGroup(final Revision revision, final UserGroupDTO userGroupDTO) {
- final String creator = NiFiUserUtils.getNiFiUserIdentity();
- final Group newUserGroup = userGroupDAO.createUserGroup(userGroupDTO);
- final Set<TenantEntity> tenantEntities = newUserGroup.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet());
- final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(newUserGroup.getIdentifier()).stream()
- .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
- final UserGroupDTO newUserGroupDto = dtoFactory.createUserGroupDto(newUserGroup, tenantEntities, policyEntities);
-
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
- return entityFactory.createUserGroupEntity(newUserGroupDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions);
- }
-
- private void validateSnippetContents(final FlowSnippetDTO flow) {
- // validate any processors
- if (flow.getProcessors() != null) {
- for (final ProcessorDTO processorDTO : flow.getProcessors()) {
- final ProcessorNode processorNode = processorDAO.getProcessor(processorDTO.getId());
- processorDTO.setValidationStatus(processorNode.getValidationStatus().name());
-
- final Collection<ValidationResult> validationErrors = processorNode.getValidationErrors();
- if (validationErrors != null && !validationErrors.isEmpty()) {
- final List<String> errors = new ArrayList<>();
- for (final ValidationResult validationResult : validationErrors) {
- errors.add(validationResult.toString());
- }
- processorDTO.setValidationErrors(errors);
- }
- }
- }
-
- if (flow.getInputPorts() != null) {
- for (final PortDTO portDTO : flow.getInputPorts()) {
- final Port port = inputPortDAO.getPort(portDTO.getId());
- final Collection<ValidationResult> validationErrors = port.getValidationErrors();
- if (validationErrors != null && !validationErrors.isEmpty()) {
- final List<String> errors = new ArrayList<>();
- for (final ValidationResult validationResult : validationErrors) {
- errors.add(validationResult.toString());
- }
- portDTO.setValidationErrors(errors);
- }
- }
- }
-
- if (flow.getOutputPorts() != null) {
- for (final PortDTO portDTO : flow.getOutputPorts()) {
- final Port port = outputPortDAO.getPort(portDTO.getId());
- final Collection<ValidationResult> validationErrors = port.getValidationErrors();
- if (validationErrors != null && !validationErrors.isEmpty()) {
- final List<String> errors = new ArrayList<>();
- for (final ValidationResult validationResult : validationErrors) {
- errors.add(validationResult.toString());
- }
- portDTO.setValidationErrors(errors);
- }
- }
- }
-
- // get any remote process group issues
- if (flow.getRemoteProcessGroups() != null) {
- for (final RemoteProcessGroupDTO remoteProcessGroupDTO : flow.getRemoteProcessGroups()) {
- final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId());
-
- if (remoteProcessGroup.getAuthorizationIssue() != null) {
- remoteProcessGroupDTO.setAuthorizationIssues(Arrays.asList(remoteProcessGroup.getAuthorizationIssue()));
- }
- }
- }
- }
-
- @Override
- public FlowEntity copySnippet(final String groupId, final String snippetId, final Double originX, final Double originY, final String idGenerationSeed) {
- // create the new snippet
- final FlowSnippetDTO snippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY, idGenerationSeed);
-
- // save the flow
- controllerFacade.save();
-
- // drop the snippet
- snippetDAO.dropSnippet(snippetId);
-
- // post process new flow snippet
- final FlowDTO flowDto = postProcessNewFlowSnippet(groupId, snippet);
-
- final FlowEntity flowEntity = new FlowEntity();
- flowEntity.setFlow(flowDto);
- return flowEntity;
- }
-
- @Override
- public SnippetEntity createSnippet(final SnippetDTO snippetDTO) {
- // add the component
- final Snippet snippet = snippetDAO.createSnippet(snippetDTO);
-
- // save the flow
- controllerFacade.save();
-
- final SnippetDTO dto = dtoFactory.createSnippetDto(snippet);
- final RevisionUpdate<SnippetDTO> snapshot = new StandardRevisionUpdate<>(dto, null);
-
- return entityFactory.createSnippetEntity(snapshot.getComponent());
- }
-
- @Override
- public PortEntity createInputPort(final Revision revision, final String groupId, final PortDTO inputPortDTO) {
- final RevisionUpdate<PortDTO> snapshot = createComponent(
- revision,
- inputPortDTO,
- () -> inputPortDAO.createPort(groupId, inputPortDTO),
- port -> dtoFactory.createPortDto(port));
-
- final Port port = inputPortDAO.getPort(inputPortDTO.getId());
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port));
- final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(port.getIdentifier()));
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier()));
- final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
- return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
- }
-
- @Override
- public PortEntity createOutputPort(final Revision revision, final String groupId, final PortDTO outputPortDTO) {
- final RevisionUpdate<PortDTO> snapshot = createComponent(
- revision,
- outputPortDTO,
- () -> outputPortDAO.createPort(groupId, outputPortDTO),
- port -> dtoFactory.createPortDto(port));
-
- final Port port = outputPortDAO.getPort(outputPortDTO.getId());
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port));
- final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(port.getIdentifier()));
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier()));
- final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
- return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
- }
-
- @Override
- public ProcessGroupEntity createProcessGroup(final Revision revision, final String parentGroupId, final ProcessGroupDTO processGroupDTO) {
- final RevisionUpdate<ProcessGroupDTO> snapshot = createComponent(
- revision,
- processGroupDTO,
- () -> processGroupDAO.createProcessGroup(parentGroupId, processGroupDTO),
- processGroup -> dtoFactory.createProcessGroupDto(processGroup));
-
- final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupDTO.getId());
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
- final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroup.getIdentifier()));
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroup.getIdentifier()));
- final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
- return entityFactory.createProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status, bulletinEntities);
- }
-
- @Override
- public RemoteProcessGroupEntity createRemoteProcessGroup(final Revision revision, final String groupId, final RemoteProcessGroupDTO remoteProcessGroupDTO) {
- final RevisionUpdate<RemoteProcessGroupDTO> snapshot = createComponent(
- revision,
- remoteProcessGroupDTO,
- () -> remoteProcessGroupDAO.createRemoteProcessGroup(groupId, remoteProcessGroupDTO),
- remoteProcessGroup -> dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));
-
- final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId());
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroup));
- final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroup, controllerFacade.getRemoteProcessGroupStatus(remoteProcessGroup.getIdentifier()));
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(remoteProcessGroup.getIdentifier()));
- final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
- return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()),
- permissions, operatePermissions, status, bulletinEntities);
- }
-
- @Override
- public boolean isRemoteGroupPortConnected(final String remoteProcessGroupId, final String remotePortId) {
- final RemoteProcessGroup rpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
- RemoteGroupPort port = rpg.getInputPort(remotePortId);
- if (port != null) {
- return port.hasIncomingConnection();
- }
-
- port = rpg.getOutputPort(remotePortId);
- if (port != null) {
- return !port.getConnections().isEmpty();
- }
-
- throw new ResourceNotFoundException("Could not find Port with ID " + remotePortId + " as a child of RemoteProcessGroup with ID " + remoteProcessGroupId);
- }
-
- @Override
- public void verifyCanAddTemplate(String groupId, String name) {
- templateDAO.verifyCanAddTemplate(name, groupId);
- }
-
- @Override
- public void verifyComponentTypes(FlowSnippetDTO snippet) {
- templateDAO.verifyComponentTypes(snippet);
- }
-
- @Override
- public void verifyComponentTypes(final VersionedProcessGroup versionedGroup) {
- controllerFacade.verifyComponentTypes(versionedGroup);
- }
-
- @Override
- public void verifyImportProcessGroup(final VersionControlInformationDTO versionControlInfo, final VersionedProcessGroup contents, final String groupId) {
- final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
- verifyImportProcessGroup(versionControlInfo, contents, group);
- }
-
- private void verifyImportProcessGroup(final VersionControlInformationDTO vciDto, final VersionedProcessGroup contents, final ProcessGroup group) {
- if (group == null) {
- return;
- }
-
- final VersionControlInformation vci = group.getVersionControlInformation();
- if (vci != null) {
- // Note that we do not compare the Registry ID here because there could be two registry clients
- // that point to the same server (one could point to localhost while another points to 127.0.0.1, for instance)..
- if (Objects.equals(vciDto.getBucketId(), vci.getBucketIdentifier())
- && Objects.equals(vciDto.getFlowId(), vci.getFlowIdentifier())) {
-
- throw new IllegalStateException("Cannot import the specified Versioned Flow into the Process Group because doing so would cause a recursive dataflow. "
- + "If Process Group A contains Process Group B, then Process Group B is not allowed to contain the flow identified by Process Group A.");
- }
- }
-
- final Set<VersionedProcessGroup> childGroups = contents.getProcessGroups();
- if (childGroups != null) {
- for (final VersionedProcessGroup childGroup : childGroups) {
- final VersionedFlowCoordinates childCoordinates = childGroup.getVersionedFlowCoordinates();
- if (childCoordinates != null) {
- final VersionControlInformationDTO childVci = new VersionControlInformationDTO();
- childVci.setBucketId(childCoordinates.getBucketId());
- childVci.setFlowId(childCoordinates.getFlowId());
- verifyImportProcessGroup(childVci, childGroup, group);
- }
- }
- }
-
- verifyImportProcessGroup(vciDto, contents, group.getParent());
- }
-
- @Override
- public TemplateDTO createTemplate(final String name, final String description, final String snippetId, final String groupId, final Optional<String> idGenerationSeed) {
- // get the specified snippet
- final Snippet snippet = snippetDAO.getSnippet(snippetId);
-
- // create the template
- final TemplateDTO templateDTO = new TemplateDTO();
- templateDTO.setName(name);
- templateDTO.setDescription(description);
- templateDTO.setTimestamp(new Date());
- templateDTO.setSnippet(snippetUtils.populateFlowSnippet(snippet, true, true, true));
- templateDTO.setEncodingVersion(TemplateDTO.MAX_ENCODING_VERSION);
-
- // set the id based on the specified seed
- final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString();
- templateDTO.setId(uuid);
-
- // create the template
- final Template template = templateDAO.createTemplate(templateDTO, groupId);
-
- // drop the snippet
- snippetDAO.dropSnippet(snippetId);
-
- // save the flow
- controllerFacade.save();
-
- return dtoFactory.createTemplateDTO(template);
- }
-
- /**
- * Ensures default values are populated for all components in this snippet. This is necessary to handle old templates without default values
- * and when existing properties have default values introduced.
- *
- * @param snippet snippet
- */
- private void ensureDefaultPropertyValuesArePopulated(final FlowSnippetDTO snippet) {
- if (snippet != null) {
- if (snippet.getControllerServices() != null) {
- snippet.getControllerServices().forEach(dto -> {
- if (dto.getProperties() == null) {
- dto.setProperties(new LinkedHashMap<>());
- }
-
- try {
- final ConfigurableComponent configurableComponent = controllerFacade.getTemporaryComponent(dto.getType(), dto.getBundle());
- configurableComponent.getPropertyDescriptors().forEach(descriptor -> {
- if (dto.getProperties().get(descriptor.getName()) == null) {
- dto.getProperties().put(descriptor.getName(), descriptor.getDefaultValue());
- }
- });
- } catch (final Exception e) {
- logger.warn(String.format("Unable to create ControllerService of type %s to populate default values.", dto.getType()));
- }
- });
- }
-
- if (snippet.getProcessors() != null) {
- snippet.getProcessors().forEach(dto -> {
- if (dto.getConfig() == null) {
- dto.setConfig(new ProcessorConfigDTO());
- }
-
- final ProcessorConfigDTO config = dto.getConfig();
- if (config.getProperties() == null) {
- config.setProperties(new LinkedHashMap<>());
- }
-
- try {
- final ConfigurableComponent configurableComponent = controllerFacade.getTemporaryComponent(dto.getType(), dto.getBundle());
- configurableComponent.getPropertyDescriptors().forEach(descriptor -> {
- if (config.getProperties().get(descriptor.getName()) == null) {
- config.getProperties().put(descriptor.getName(), descriptor.getDefaultValue());
- }
- });
- } catch (final Exception e) {
- logger.warn(String.format("Unable to create Processor of type %s to populate default values.", dto.getType()));
- }
- });
- }
-
- if (snippet.getProcessGroups() != null) {
- snippet.getProcessGroups().forEach(processGroup -> {
- ensureDefaultPropertyValuesArePopulated(processGroup.getContents());
- });
- }
- }
- }
-
- @Override
- public TemplateDTO importTemplate(final TemplateDTO templateDTO, final String groupId, final Optional<String> idGenerationSeed) {
- // ensure id is set
- final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString();
- templateDTO.setId(uuid);
-
- // mark the timestamp
- templateDTO.setTimestamp(new Date());
-
- // ensure default values are populated
- ensureDefaultPropertyValuesArePopulated(templateDTO.getSnippet());
-
- // import the template
- final Template template = templateDAO.importTemplate(templateDTO, groupId);
-
- // save the flow
- controllerFacade.save();
-
- // return the template dto
- return dtoFactory.createTemplateDTO(template);
- }
-
- /**
- * Post processes a new flow snippet including validation, removing the snippet, and DTO conversion.
- *
- * @param groupId group id
- * @param snippet snippet
- * @return flow dto
- */
- private FlowDTO postProcessNewFlowSnippet(final String groupId, final FlowSnippetDTO snippet) {
- // validate the new snippet
- validateSnippetContents(snippet);
-
- // identify all components added
- final Set<String> identifiers = new HashSet<>();
- snippet.getProcessors().stream()
- .map(proc -> proc.getId())
- .forEach(id -> identifiers.add(id));
- snippet.getConnections().stream()
- .map(conn -> conn.getId())
- .forEach(id -> identifiers.add(id));
- snippet.getInputPorts().stream()
- .map(port -> port.getId())
- .forEach(id -> identifiers.add(id));
- snippet.getOutputPorts().stream()
- .map(port -> port.getId())
- .forEach(id -> identifiers.add(id));
- snippet.getProcessGroups().stream()
- .map(group -> group.getId())
- .forEach(id -> identifiers.add(id));
- snippet.getRemoteProcessGroups().stream()
- .map(remoteGroup -> remoteGroup.getId())
- .forEach(id -> identifiers.add(id));
- snippet.getRemoteProcessGroups().stream()
- .filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getInputPorts() != null)
- .flatMap(remoteGroup -> remoteGroup.getContents().getInputPorts().stream())
- .map(remoteInputPort -> remoteInputPort.getId())
- .forEach(id -> identifiers.add(id));
- snippet.getRemoteProcessGroups().stream()
- .filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getOutputPorts() != null)
- .flatMap(remoteGroup -> remoteGroup.getContents().getOutputPorts().stream())
- .map(remoteOutputPort -> remoteOutputPort.getId())
- .forEach(id -> identifiers.add(id));
- snippet.getLabels().stream()
- .map(label -> label.getId())
- .forEach(id -> identifiers.add(id));
-
- final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
- final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId);
- return dtoFactory.createFlowDto(group, groupStatus, snippet, revisionManager, this::getProcessGroupBulletins);
- }
-
- @Override
- public FlowEntity createTemplateInstance(final String groupId, final Double originX, final Double originY, final String templateEncodingVersion,
- final FlowSnippetDTO requestSnippet, final String idGenerationSeed) {
-
- // instantiate the template - there is no need to make another copy of the flow snippet since the actual template
- // was copied and this dto is only used to instantiate it's components (which as already completed)
- final FlowSnippetDTO snippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateEncodingVersion, requestSnippet, idGenerationSeed);
-
- // save the flow
- controllerFacade.save();
-
- // post process the new flow snippet
- final FlowDTO flowDto = postProcessNewFlowSnippet(groupId, snippet);
-
- final FlowEntity flowEntity = new FlowEntity();
- flowEntity.setFlow(flowDto);
- return flowEntity;
- }
-
- @Override
- public ControllerServiceEntity createControllerService(final Revision revision, final String groupId, final ControllerServiceDTO controllerServiceDTO) {
- controllerServiceDTO.setParentGroupId(groupId);
-
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
-
- // request claim for component to be created... revision already verified (version == 0)
- final RevisionClaim claim = new StandardRevisionClaim(revision);
-
- final RevisionUpdate<ControllerServiceDTO> snapshot;
- if (groupId == null) {
- // update revision through revision manager
- snapshot = revisionManager.updateRevision(claim, user, () -> {
- // Unfortunately, we can not use the createComponent() method here because createComponent() wants to obtain the read lock
- // on the group. The Controller Service may or may not have a Process Group (it won't if it's controller-scoped).
- final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO);
- controllerFacade.save();
-
- awaitValidationCompletion(controllerService);
- final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(controllerService);
-
- final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
- return new StandardRevisionUpdate<>(dto, lastMod);
- });
- } else {
- snapshot = revisionManager.updateRevision(claim, user, () -> {
- final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO);
- controllerFacade.save();
-
- awaitValidationCompletion(controllerService);
- final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(controllerService);
-
- final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
- return new StandardRevisionUpdate<>(dto, lastMod);
- });
- }
-
- final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceDTO.getId());
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService));
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(controllerServiceDTO.getId()));
- final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
- return entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities);
- }
-
- @Override
- public ControllerServiceEntity updateControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) {
- // get the component, ensure we have access to it, and perform the update request
- final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceDTO.getId());
- final RevisionUpdate<ControllerServiceDTO> snapshot = updateComponent(revision,
- controllerService,
- () -> controllerServiceDAO.updateControllerService(controllerServiceDTO),
- cs -> {
- awaitValidationCompletion(cs);
- final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(cs);
- final ControllerServiceReference ref = controllerService.getReferences();
- final ControllerServiceReferencingComponentsEntity referencingComponentsEntity =
- createControllerServiceReferencingComponentsEntity(ref, Sets.newHashSet(controllerService.getIdentifier()));
- dto.setReferencingComponents(referencingComponentsEntity.getControllerServiceReferencingComponents());
- return dto;
- });
-
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService));
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(controllerServiceDTO.getId()));
- final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
- return entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities);
- }
-
-
- @Override
- public ControllerServiceReferencingComponentsEntity updateControllerServiceReferencingComponents(
- final Map<String, Revision> referenceRevisions, final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) {
-
- final RevisionClaim claim = new StandardRevisionClaim(referenceRevisions.values());
-
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
- final RevisionUpdate<ControllerServiceReferencingComponentsEntity> update = revisionManager.updateRevision(claim, user,
- new UpdateRevisionTask<ControllerServiceReferencingComponentsEntity>() {
- @Override
- public RevisionUpdate<ControllerServiceReferencingComponentsEntity> update() {
- final Set<ComponentNode> updated = controllerServiceDAO.updateControllerServiceReferencingComponents(controllerServiceId, scheduledState, controllerServiceState);
- final ControllerServiceReference updatedReference = controllerServiceDAO.getControllerService(controllerServiceId).getReferences();
-
- // get the revisions of the updated components
- final Map<String, Revision> updatedRevisions = new HashMap<>();
- for (final ComponentNode component : updated) {
- final Revision currentRevision = revisionManager.getRevision(component.getIdentifier());
- final Revision requestRevision = referenceRevisions.get(component.getIdentifier());
- updatedRevisions.put(component.getIdentifier(), currentRevision.incrementRevision(requestRevision.getClientId()));
- }
-
- // ensure the revision for all referencing components is included regardless of whether they were updated in this request
- for (final ComponentNode component : updatedReference.findRecursiveReferences(ComponentNode.class)) {
- updatedRevisions.putIfAbsent(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier()));
- }
-
- final ControllerServiceReferencingComponentsEntity entity = createControllerServiceReferencingComponentsEntity(updatedReference, updatedRevisions);
- return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values()));
- }
- });
-
- return update.getComponent();
- }
-
- /**
- * Finds the identifiers for all components referencing a ControllerService.
- *
- * @param reference ControllerServiceReference
- * @param visited ControllerServices we've already visited
- */
- private void findControllerServiceReferencingComponentIdentifiers(final ControllerServiceReference reference, final Set<ControllerServiceNode> visited) {
- for (final ComponentNode component : reference.getReferencingComponents()) {
-
- // if this is a ControllerService consider it's referencing components
- if (component instanceof ControllerServiceNode) {
- final ControllerServiceNode node = (ControllerServiceNode) component;
- if (!visited.contains(node)) {
- visited.add(node);
- findControllerServiceReferencingComponentIdentifiers(node.getReferences(), visited);
- }
- }
- }
- }
-
- /**
- * Creates entities for components referencing a ControllerService using their current revision.
- *
- * @param reference ControllerServiceReference
- * @return The entity
- */
- private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity(final ControllerServiceReference reference, final Set<String> lockedIds) {
- final Set<ControllerServiceNode> visited = new HashSet<>();
- visited.add(reference.getReferencedComponent());
- findControllerServiceReferencingComponentIdentifiers(reference, visited);
-
- final Map<String, Revision> referencingRevisions = new HashMap<>();
- for (final ComponentNode component : reference.getReferencingComponents()) {
- referencingRevisions.put(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier()));
- }
-
- return createControllerServiceReferencingComponentsEntity(reference, referencingRevisions);
- }
-
- /**
- * Creates entities for components referencing a ControllerService using the specified revisions.
- *
- * @param reference ControllerServiceReference
- * @param revisions The revisions
- * @return The entity
- */
- private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity(
- final ControllerServiceReference reference, final Map<String, Revision> revisions) {
- final Set<ControllerServiceNode> visited = new HashSet<>();
- visited.add(reference.getReferencedComponent());
- return createControllerServiceReferencingComponentsEntity(reference, revisions, visited);
- }
-
- /**
- * Creates entities for components referencing a ControllerServcie using the specified revisions.
- *
- * @param reference ControllerServiceReference
- * @param revisions The revisions
- * @param visited Which services we've already considered (in case of cycle)
- * @return The entity
- */
- private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity(
- final ControllerServiceReference reference, final Map<String, Revision> revisions, final Set<ControllerServiceNode> visited) {
-
- final String modifier = NiFiUserUtils.getNiFiUserIdentity();
- final Set<ComponentNode> referencingComponents = reference.getReferencingComponents();
-
- final Set<ControllerServiceReferencingComponentEntity> componentEntities = new HashSet<>();
- for (final ComponentNode refComponent : referencingComponents) {
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(refComponent);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(refComponent));
-
- final Revision revision = revisions.get(refComponent.getIdentifier());
- final FlowModification flowMod = new FlowModification(revision, modifier);
- final RevisionDTO revisionDto = dtoFactory.createRevisionDTO(flowMod);
- final ControllerServiceReferencingComponentDTO dto = dtoFactory.createControllerServiceReferencingComponentDTO(refComponent);
-
- if (refComponent instanceof ControllerServiceNode) {
- final ControllerServiceNode node = (ControllerServiceNode) refComponent;
-
- // indicate if we've hit a cycle
- dto.setReferenceCycle(visited.contains(node));
-
- // mark node as visited before building the reference cycle
- visited.add(node);
-
- // if we haven't encountered this service before include it's referencing components
- if (!dto.getReferenceCycle()) {
- final ControllerServiceReference refReferences = node.getReferences();
- final Map<String, Revision> referencingRevisions = new HashMap<>(revisions);
- for (final ComponentNode component : refReferences.getReferencingComponents()) {
- referencingRevisions.putIfAbsent(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier()));
- }
- final ControllerServiceReferencingComponentsEntity references = createControllerServiceReferencingComponentsEntity(refReferences, referencingRevisions, visited);
- dto.setReferencingComponents(references.getControllerServiceReferencingComponents());
- }
- }
-
- componentEntities.add(entityFactory.createControllerServiceReferencingComponentEntity(refComponent.getIdentifier(), dto, revisionDto, permissions, operatePermissions));
- }
-
- final ControllerServiceReferencingComponentsEntity entity = new ControllerServiceReferencingComponentsEntity();
- entity.setControllerServiceReferencingComponents(componentEntities);
- return entity;
- }
-
- @Override
- public ControllerServiceEntity deleteControllerService(final Revision revision, final String controllerServiceId) {
- final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService));
- final ControllerServiceDTO snapshot = deleteComponent(
- revision,
- controllerService.getResource(),
- () -> controllerServiceDAO.deleteControllerService(controllerServiceId),
- true,
- dtoFactory.createControllerServiceDto(controllerService));
-
- return entityFactory.createControllerServiceEntity(snapshot, null, permissions, operatePermissions, null);
- }
-
-
- @Override
- public RegistryClientEntity createRegistryClient(Revision revision, RegistryDTO registryDTO) {
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
-
- // request claim for component to be created... revision already verified (version == 0)
- final RevisionClaim claim = new StandardRevisionClaim(revision);
-
- // update revision through revision manager
- final RevisionUpdate<FlowRegistry> revisionUpdate = revisionManager.updateRevision(claim, user, () -> {
- // add the component
- final FlowRegistry registry = registryDAO.createFlowRegistry(registryDTO);
-
- // save the flow
- controllerFacade.save();
-
- final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
- return new StandardRevisionUpdate<>(registry, lastMod);
- });
-
- final FlowRegistry registry = revisionUpdate.getComponent();
- return createRegistryClientEntity(registry);
- }
-
- @Override
- public RegistryClientEntity getRegistryClient(final String registryId) {
- final FlowRegistry registry = registryDAO.getFlowRegistry(registryId);
- return createRegistryClientEntity(registry);
- }
-
- private RegistryClientEntity createRegistryClientEntity(final FlowRegistry flowRegistry) {
- if (flowRegistry == null) {
- return null;
- }
-
- final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(flowRegistry.getIdentifier()));
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getController());
- final RegistryDTO dto = dtoFactory.createRegistryDto(flowRegistry);
-
- return entityFactory.createRegistryClientEntity(dto, revision, permissions);
- }
-
- private VersionedFlowEntity createVersionedFlowEntity(final String registryId, final VersionedFlow versionedFlow) {
- if (versionedFlow == null) {
- return null;
- }
-
- final VersionedFlowDTO dto = new VersionedFlowDTO();
- dto.setRegistryId(registryId);
- dto.setBucketId(versionedFlow.getBucketIdentifier());
- dto.setFlowId(versionedFlow.getIdentifier());
- dto.setFlowName(versionedFlow.getName());
- dto.setDescription(versionedFlow.getDescription());
-
- final VersionedFlowEntity entity = new VersionedFlowEntity();
- entity.setVersionedFlow(dto);
-
- return entity;
- }
-
- private VersionedFlowSnapshotMetadataEntity createVersionedFlowSnapshotMetadataEntity(final String registryId, final VersionedFlowSnapshotMetadata metadata) {
- if (metadata == null) {
- return null;
- }
-
- final VersionedFlowSnapshotMetadataEntity entity = new VersionedFlowSnapshotMetadataEntity();
- entity.setRegistryId(registryId);
- entity.setVersionedFlowMetadata(metadata);
-
- return entity;
- }
-
- @Override
- public Set<RegistryClientEntity> getRegistryClients() {
- return registryDAO.getFlowRegistries().stream()
- .map(this::createRegistryClientEntity)
- .collect(Collectors.toSet());
- }
-
- @Override
- public Set<RegistryEntity> getRegistriesForUser(final NiFiUser user) {
- return registryDAO.getFlowRegistriesForUser(user).stream()
- .map(flowRegistry -> entityFactory.createRegistryEntity(dtoFactory.createRegistryDto(flowRegistry)))
- .collect(Collectors.toSet());
- }
-
- @Override
- public Set<BucketEntity> getBucketsForUser(final String registryId, final NiFiUser user) {
- return registryDAO.getBucketsForUser(registryId, user).stream()
- .map(bucket -> {
- if (bucket == null) {
- return null;
- }
-
- final BucketDTO dto = new BucketDTO();
- dto.setId(bucket.getIdentifier());
- dto.setName(bucket.getName());
- dto.setDescription(bucket.getDescription());
- dto.setCreated(bucket.getCreatedTimestamp());
-
- final Permissions regPermissions = bucket.getPermissions();
- final PermissionsDTO permissions = new PermissionsDTO();
- permissions.setCanRead(regPermissions.getCanRead());
- permissions.setCanWrite(regPermissions.getCanWrite());
-
- return entityFactory.createBucketEntity(dto, permissions);
- })
- .collect(Collectors.toSet());
- }
-
- @Override
- public Set<VersionedFlowEntity> getFlowsForUser(String registryId, String bucketId, NiFiUser user) {
- return registryDAO.getFlowsForUser(registryId, bucketId, user).stream()
- .map(vf -> createVersionedFlowEntity(registryId, vf))
- .collect(Collectors.toSet());
- }
-
- @Override
- public Set<VersionedFlowSnapshotMetadataEntity> getFlowVersionsForUser(String registryId, String bucketId, String flowId, NiFiUser user) {
- return registryDAO.getFlowVersionsForUser(registryId, bucketId, flowId, user).stream()
- .map(md -> createVersionedFlowSnapshotMetadataEntity(registryId, md))
- .collect(Collectors.toSet());
- }
-
- @Override
- public RegistryClientEntity updateRegistryClient(Revision revision, RegistryDTO registryDTO) {
- final RevisionClaim revisionClaim = new StandardRevisionClaim(revision);
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
-
- final FlowRegistry registry = registryDAO.getFlowRegistry(registryDTO.getId());
- final RevisionUpdate<FlowRegistry> revisionUpdate = revisionManager.updateRevision(revisionClaim, user, () -> {
- final boolean duplicateName = registryDAO.getFlowRegistries().stream()
- .anyMatch(reg -> reg.getName().equals(registryDTO.getName()) && !reg.getIdentifier().equals(registryDTO.getId()));
-
- if (duplicateName) {
- throw new IllegalStateException("Cannot update Flow Registry because a Flow Registry already exists with the name " + registryDTO.getName());
- }
-
- registry.setDescription(registryDTO.getDescription());
- registry.setName(registryDTO.getName());
- registry.setURL(registryDTO.getUri());
-
- controllerFacade.save();
-
- final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId());
- final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity());
-
- return new StandardRevisionUpdate<>(registry, lastModification);
- });
-
- final FlowRegistry updatedReg = revisionUpdate.getComponent();
- return createRegistryClientEntity(updatedReg);
- }
-
- @Override
- public void verifyDeleteRegistry(String registryId) {
- processGroupDAO.verifyDeleteFlowRegistry(registryId);
- }
-
- @Override
- public RegistryClientEntity deleteRegistryClient(final Revision revision, final String registryId) {
- final RevisionClaim claim = new StandardRevisionClaim(revision);
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
-
- final FlowRegistry registry = revisionManager.deleteRevision(claim, user, () -> {
- final FlowRegistry reg = registryDAO.removeFlowRegistry(registryId);
- controllerFacade.save();
- return reg;
- });
-
- return createRegistryClientEntity(registry);
- }
-
- @Override
- public ReportingTaskEntity createReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) {
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
-
- // request claim for component to be created... revision already verified (version == 0)
- final RevisionClaim claim = new StandardRevisionClaim(revision);
-
- // update revision through revision manager
- final RevisionUpdate<ReportingTaskDTO> snapshot = revisionManager.updateRevision(claim, user, () -> {
- // create the reporting task
- final ReportingTaskNode reportingTask = reportingTaskDAO.createReportingTask(reportingTaskDTO);
-
- // save the update
- controllerFacade.save();
- awaitValidationCompletion(reportingTask);
-
- final ReportingTaskDTO dto = dtoFactory.createReportingTaskDto(reportingTask);
- final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
- return new StandardRevisionUpdate<>(dto, lastMod);
- });
-
- final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskDTO.getId());
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask));
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier()));
- final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
- return entityFactory.createReportingTaskEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities);
- }
-
- @Override
- public ReportingTaskEntity updateReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) {
- // get the component, ensure we have access to it, and perform the update request
- final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskDTO.getId());
- final RevisionUpdate<ReportingTaskDTO> snapshot = updateComponent(revision,
- reportingTask,
- () -> reportingTaskDAO.updateReportingTask(reportingTaskDTO),
- rt -> {
- awaitValidationCompletion(rt);
- return dtoFactory.createReportingTaskDto(rt);
- });
-
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask));
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier()));
- final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
- return entityFactory.createReportingTaskEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities);
- }
-
- @Override
- public ReportingTaskEntity deleteReportingTask(final Revision revision, final String reportingTaskId) {
- final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask));
- final ReportingTaskDTO snapshot = deleteComponent(
- revision,
- reportingTask.getResource(),
- () -> reportingTaskDAO.deleteReportingTask(reportingTaskId),
- true,
- dtoFactory.createReportingTaskDto(reportingTask));
-
- return entityFactory.createReportingTaskEntity(snapshot, null, permissions, operatePermissions, null);
- }
-
- @Override
- public void deleteActions(final Date endDate) {
- // get the user from the request
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
- if (user == null) {
- throw new WebApplicationException(new Throwable("Unable to access details for current user."));
- }
-
- // create the purge details
- final FlowChangePurgeDetails details = new FlowChangePurgeDetails();
- details.setEndDate(endDate);
-
- // create a purge action to record that records are being removed
- final FlowChangeAction purgeAction = new FlowChangeAction();
- purgeAction.setUserIdentity(user.getIdentity());
- purgeAction.setOperation(Operation.Purge);
- purgeAction.setTimestamp(new Date());
- purgeAction.setSourceId("Flow Controller");
- purgeAction.setSourceName("History");
- purgeAction.setSourceType(Component.Controller);
- purgeAction.setActionDetails(details);
-
- // purge corresponding actions
- auditService.purgeActions(endDate, purgeAction);
- }
-
- @Override
- public ProvenanceDTO submitProvenance(final ProvenanceDTO query) {
- return controllerFacade.submitProvenance(query);
- }
-
- @Override
- public void deleteProvenance(final String queryId) {
- controllerFacade.deleteProvenanceQuery(queryId);
- }
-
- @Override
- public LineageDTO submitLineage(final LineageDTO lineage) {
- return controllerFacade.submitLineage(lineage);
- }
-
- @Override
- public void deleteLineage(final String lineageId) {
- controllerFacade.deleteLineage(lineageId);
- }
-
- @Override
- public ProvenanceEventDTO submitReplay(final Long eventId) {
- return controllerFacade.submitReplay(eventId);
- }
-
- // -----------------------------------------
- // Read Operations
- // -----------------------------------------
-
- @Override
- public SearchResultsDTO searchController(final String query) {
- return controllerFacade.search(query);
- }
-
- @Override
- public DownloadableContent getContent(final String connectionId, final String flowFileUuid, final String uri) {
- return connectionDAO.getContent(connectionId, flowFileUuid, uri);
- }
-
- @Override
- public DownloadableContent getContent(final Long eventId, final String uri, final ContentDirection contentDirection) {
- return controllerFacade.getContent(eventId, uri, contentDirection);
- }
-
- @Override
- public ProvenanceDTO getProvenance(final String queryId, final Boolean summarize, final Boolean incrementalResults) {
- return controllerFacade.getProvenanceQuery(queryId, summarize, incrementalResults);
- }
-
- @Override
- public LineageDTO getLineage(final String lineageId) {
- return controllerFacade.getLineage(lineageId);
- }
-
- @Override
- public ProvenanceOptionsDTO getProvenanceSearchOptions() {
- return controllerFacade.getProvenanceSearchOptions();
- }
-
- @Override
- public ProvenanceEventDTO getProvenanceEvent(final Long id) {
- return controllerFacade.getProvenanceEvent(id);
- }
-
- @Override
- public ProcessGroupStatusEntity getProcessGroupStatus(final String groupId, final boolean recursive) {
- final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
- final ProcessGroupStatusDTO dto = dtoFactory.createProcessGroupStatusDto(processGroup, controllerFacade.getProcessGroupStatus(groupId));
-
- // prune the response as necessary
- if (!recursive) {
- pruneChildGroups(dto.getAggregateSnapshot());
- if (dto.getNodeSnapshots() != null) {
- for (final NodeProcessGroupStatusSnapshotDTO nodeSnapshot : dto.getNodeSnapshots()) {
- pruneChildGroups(nodeSnapshot.getStatusSnapshot());
- }
- }
- }
-
- return entityFactory.createProcessGroupStatusEntity(dto, permissions);
- }
-
- private void pruneChildGroups(final ProcessGroupStatusSnapshotDTO snapshot) {
- for (final ProcessGroupStatusSnapshotEntity childProcessGroupStatusEntity : snapshot.getProcessGroupStatusSnapshots()) {
- final ProcessGroupStatusSnapshotDTO childProcessGroupStatus = childProcessGroupStatusEntity.getProcessGroupStatusSnapshot();
- childProcessGroupStatus.setConnectionStatusSnapshots(null);
- childProcessGroupStatus.setProcessGroupStatusSnapshots(null);
- childProcessGroupStatus.setInputPortStatusSnapshots(null);
- childProcessGroupStatus.setOutputPortStatusSnapshots(null);
- childProcessGroupStatus.setProcessorStatusSnapshots(null);
- childProcessGroupStatus.setRemoteProcessGroupStatusSnapshots(null);
- }
- }
-
- @Override
- public ControllerStatusDTO getControllerStatus() {
- return controllerFacade.getControllerStatus();
- }
-
- @Override
- public ComponentStateDTO getProcessorState(final String processorId) {
- final StateMap clusterState = isClustered() ? processorDAO.getState(processorId, Scope.CLUSTER) : null;
- final StateMap localState = processorDAO.getState(processorId, Scope.LOCAL);
-
- // processor will be non null as it was already found when getting the state
- final ProcessorNode processor = processorDAO.getProcessor(processorId);
- return dtoFactory.createComponentStateDTO(processorId, processor.getProcessor().getClass(), localState, clusterState);
- }
-
- @Override
- public ComponentStateDTO getControllerServiceState(final String controllerServiceId) {
- final StateMap clusterState = isClustered() ? controllerServiceDAO.getState(controllerServiceId, Scope.CLUSTER) : null;
- final StateMap localState = controllerServiceDAO.getState(controllerServiceId, Scope.LOCAL);
-
- // controller service will be non null as it was already found when getting the state
- final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId);
- return dtoFactory.createComponentStateDTO(controllerServiceId, controllerService.getControllerServiceImplementation().getClass(), localState, clusterState);
- }
-
- @Override
- public ComponentStateDTO getReportingTaskState(final String reportingTaskId) {
- final StateMap clusterState = isClustered() ? reportingTaskDAO.getState(reportingTaskId, Scope.CLUSTER) : null;
- final StateMap localState = reportingTaskDAO.getState(reportingTaskId, Scope.LOCAL);
-
- // reporting task will be non null as it was already found when getting the state
- final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId);
- return dtoFactory.createComponentStateDTO(reportingTaskId, reportingTask.getReportingTask().getClass(), localState, clusterState);
- }
-
- @Override
- public CountersDTO getCounters() {
- final List<Counter> counters = controllerFacade.getCounters();
- final Set<CounterDTO> counterDTOs = new LinkedHashSet<>(counters.size());
- for (final Counter counter : counters) {
- counterDTOs.add(dtoFactory.createCounterDto(counter));
- }
-
- final CountersSnapshotDTO snapshotDto = dtoFactory.createCountersDto(counterDTOs);
- final CountersDTO countersDto = new CountersDTO();
- countersDto.setAggregateSnapshot(snapshotDto);
-
- return countersDto;
- }
-
- private ConnectionEntity createConnectionEntity(final Connection connection) {
- final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(connection.getIdentifier()));
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
- final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connection.getIdentifier()));
- return entityFactory.createConnectionEntity(dtoFactory.createConnectionDto(connection), revision, permissions, status);
- }
-
- @Override
- public Set<ConnectionEntity> getConnections(final String groupId) {
- final Set<Connection> connections = connectionDAO.getConnections(groupId);
- return connections.stream()
- .map(connection -> createConnectionEntity(connection))
- .collect(Collectors.toSet());
- }
-
- @Override
- public ConnectionEntity getConnection(final String connectionId) {
- final Connection connection = connectionDAO.getConnection(connectionId);
- return createConnectionEntity(connection);
- }
-
- @Override
- public DropRequestDTO getFlowFileDropRequest(final String connectionId, final String dropRequestId) {
- return dtoFactory.createDropRequestDTO(connectionDAO.getFlowFileDropRequest(connectionId, dropRequestId));
- }
-
- @Override
- public ListingRequestDTO getFlowFileListingRequest(final String connectionId, final String listingRequestId) {
- final Connection connection = connectionDAO.getConnection(connectionId);
- final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.getFlowFileListingRequest(connectionId, listingRequestId));
-
- // include whether the source and destination are running
- if (connection.getSource() != null) {
- listRequest.setSourceRunning(connection.getSource().isRunning());
- }
- if (connection.getDestination() != null) {
- listRequest.setDestinationRunning(connection.getDestination().isRunning());
- }
-
- return listRequest;
- }
-
- @Override
- public FlowFileDTO getFlowFile(final String connectionId, final String flowFileUuid) {
- return dtoFactory.createFlowFileDTO(connectionDAO.getFlowFile(connectionId, flowFileUuid));
- }
-
- @Override
- public ConnectionStatusEntity getConnectionStatus(final String connectionId) {
- final Connection connection = connectionDAO.getConnection(connectionId);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
- final ConnectionStatusDTO dto = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionId));
- return entityFactory.createConnectionStatusEntity(dto, permissions);
- }
-
- @Override
- public StatusHistoryEntity getConnectionStatusHistory(final String connectionId) {
- final Connection connection = connectionDAO.getConnection(connectionId);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
- final StatusHistoryDTO dto = controllerFacade.getConnectionStatusHistory(connectionId);
- return entityFactory.createStatusHistoryEntity(dto, permissions);
- }
-
- private ProcessorEntity createProcessorEntity(final ProcessorNode processor, final NiFiUser user) {
- final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processor.getIdentifier()));
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor, user);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor));
- final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier()));
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processor.getIdentifier()));
- final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
- return entityFactory.createProcessorEntity(dtoFactory.createProcessorDto(processor), revision, permissions, operatePermissions, status, bulletinEntities);
- }
-
- @Override
- public Set<ProcessorEntity> getProcessors(final String groupId, final boolean includeDescendants) {
- final Set<ProcessorNode> processors = processorDAO.getProcessors(groupId, includeDescendants);
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
- return processors.stream()
- .map(processor -> createProcessorEntity(processor, user))
- .collect(Collectors.toSet());
- }
-
- @Override
- public TemplateDTO exportTemplate(final String id) {
- final Template template = templateDAO.getTemplate(id);
- final TemplateDTO templateDetails = template.getDetails();
-
- final TemplateDTO templateDTO = dtoFactory.createTemplateDTO(template);
- templateDTO.setSnippet(dtoFactory.copySnippetContents(templateDetails.getSnippet()));
- return templateDTO;
- }
-
- @Override
- public TemplateDTO getTemplate(final String id) {
- return dtoFactory.createTemplateDTO(templateDAO.getTemplate(id));
- }
-
- @Override
- public Set<TemplateEntity> getTemplates() {
- return templateDAO.getTemplates().stream()
- .map(template -> {
- final TemplateDTO dto = dtoFactory.createTemplateDTO(template);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(template);
-
- final TemplateEntity entity = new TemplateEntity();
- entity.setId(dto.getId());
- entity.setPermissions(permissions);
- entity.setTemplate(dto);
- return entity;
- }).collect(Collectors.toSet());
- }
-
- @Override
- public Set<DocumentedTypeDTO> getWorkQueuePrioritizerTypes() {
- return controllerFacade.getFlowFileComparatorTypes();
- }
-
- @Override
- public Set<DocumentedTypeDTO> getProcessorTypes(final String bundleGroup, final String bundleArtifact, final String type) {
- return controllerFacade.getFlowFileProcessorTypes(bundleGroup, bundleArtifact, type);
- }
-
- @Override
- public Set<DocumentedTypeDTO> getControllerServiceTypes(final String serviceType, final String serviceBundleGroup, final String serviceBundleArtifact, final String serviceBundleVersion,
- final String bundleGroup, final String bundleArtifact, final String type) {
- return controllerFacade.getControllerServiceTypes(serviceType, serviceBundleGroup, serviceBundleArtifact, serviceBundleVersion, bundleGroup, bundleArtifact, type);
- }
-
- @Override
- public Set<DocumentedTypeDTO> getReportingTaskTypes(final String bundleGroup, final String bundleArtifact, final String type) {
- return controllerFacade.getReportingTaskTypes(bundleGroup, bundleArtifact, type);
- }
-
- @Override
- public ProcessorEntity getProcessor(final String id) {
- final ProcessorNode processor = processorDAO.getProcessor(id);
- return createProcessorEntity(processor, NiFiUserUtils.getNiFiUser());
- }
-
- @Override
- public PropertyDescriptorDTO getProcessorPropertyDescriptor(final String id, final String property) {
- final ProcessorNode processor = processorDAO.getProcessor(id);
- PropertyDescriptor descriptor = processor.getPropertyDescriptor(property);
-
- // return an invalid descriptor if the processor doesn't support this property
- if (descriptor == null) {
- descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
- }
-
- return dtoFactory.createPropertyDescriptorDto(descriptor, processor.getProcessGroup().getIdentifier());
- }
-
- @Override
- public ProcessorStatusEntity getProcessorStatus(final String id) {
- final ProcessorNode processor = processorDAO.getProcessor(id);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor);
- final ProcessorStatusDTO dto = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(id));
- return entityFactory.createProcessorStatusEntity(dto, permissions);
- }
-
- @Override
- public StatusHistoryEntity getProcessorStatusHistory(final String id) {
- final ProcessorNode processor = processorDAO.getProcessor(id);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor);
- final StatusHistoryDTO dto = controllerFacade.getProcessorStatusHistory(id);
- return entityFactory.createStatusHistoryEntity(dto, permissions);
- }
-
- private boolean authorizeBulletin(final Bulletin bulletin) {
- final String sourceId = bulletin.getSourceId();
- final ComponentType type = bulletin.getSourceType();
-
- final Authorizable authorizable;
- try {
- switch (type) {
- case PROCESSOR:
- authorizable = authorizableLookup.getProcessor(sourceId).getAuthorizable();
- break;
- case REPORTING_TASK:
- authorizable = authorizableLookup.getReportingTask(sourceId).getAuthorizable();
- break;
- case CONTROLLER_SERVICE:
- authorizable = authorizableLookup.getControllerService(sourceId).getAuthorizable();
- break;
- case FLOW_CONTROLLER:
- authorizable = controllerFacade;
- break;
- case INPUT_PORT:
- authorizable = authorizableLookup.getInputPort(sourceId);
- break;
- case OUTPUT_PORT:
- authorizable = authorizableLookup.getOutputPort(sourceId);
- break;
- case REMOTE_PROCESS_GROUP:
- authorizable = authorizableLookup.getRemoteProcessGroup(sourceId);
- break;
- default:
- throw new WebApplicationException(Response.serverError().entity("An unexpected type of component is the source of this bulletin.").build());
- }
- } catch (final ResourceNotFoundException e) {
- // if the underlying component is gone, disallow
- return false;
- }
-
- // perform the authorization
- final AuthorizationResult result = authorizable.checkAuthorization(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
- return Result.Approved.equals(result.getResult());
- }
-
- @Override
- public BulletinBoardDTO getBulletinBoard(final BulletinQueryDTO query) {
- // build the query
- final BulletinQuery.Builder queryBuilder = new BulletinQuery.Builder()
- .groupIdMatches(query.getGroupId())
- .sourceIdMatches(query.getSourceId())
- .nameMatches(query.getName())
- .messageMatches(query.getMessage())
- .after(query.getAfter())
- .limit(query.getLimit());
-
- // perform the query
- final List<Bulletin> results = bulletinRepository.findBulletins(queryBuilder.build());
-
- // perform the query and generate the results - iterating in reverse order since we are
- // getting the most recent results by ordering by timestamp desc above. this gets the
- // exact results we want but in reverse order
- final List<BulletinEntity> bulletinEntities = new ArrayList<>();
- for (final ListIterator<Bulletin> bulletinIter = results.listIterator(results.size()); bulletinIter.hasPrevious(); ) {
- final Bulletin bulletin = bulletinIter.previous();
- bulletinEntities.add(entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), authorizeBulletin(bulletin)));
- }
-
- // create the bulletin board
- final BulletinBoardDTO bulletinBoard = new BulletinBoardDTO();
- bulletinBoard.setBulletins(bulletinEntities);
- bulletinBoard.setGenerated(new Date());
- return bulletinBoard;
- }
-
- @Override
- public SystemDiagnosticsDTO getSystemDiagnostics() {
- final SystemDiagnostics sysDiagnostics = controllerFacade.getSystemDiagnostics();
- return dtoFactory.createSystemDiagnosticsDto(sysDiagnostics);
- }
-
- @Override
- public List<ResourceDTO> getResources() {
- final List<Resource> resources = controllerFacade.getResources();
- final List<ResourceDTO> resourceDtos = new ArrayList<>(resources.size());
- for (final Resource resource : resources) {
- resourceDtos.add(dtoFactory.createResourceDto(resource));
- }
- return resourceDtos;
- }
-
- @Override
- public void discoverCompatibleBundles(VersionedProcessGroup versionedGroup) {
- BundleUtils.discoverCompatibleBundles(controllerFacade.getExtensionManager(), versionedGroup);
- }
-
- @Override
- public BundleCoordinate getCompatibleBundle(String type, BundleDTO bundleDTO) {
- return BundleUtils.getCompatibleBundle(controllerFacade.getExtensionManager(), type, bundleDTO);
- }
-
- @Override
- public ConfigurableComponent getTempComponent(String classType, BundleCoordinate bundleCoordinate) {
- return controllerFacade.getExtensionManager().getTempComponent(classType, bundleCoordinate);
- }
-
- /**
- * Ensures the specified user has permission to access the specified port. This method does
- * not utilize the DataTransferAuthorizable as that will enforce the entire chain is
- * authorized for the transfer. This method is only invoked when obtaining the site to site
- * details so the entire chain isn't necessary.
- */
- private boolean isUserAuthorized(final NiFiUser user, final RootGroupPort port) {
- final boolean isSiteToSiteSecure = Boolean.TRUE.equals(properties.isSiteToSiteSecure());
-
- // if site to site is not secure, allow all users
- if (!isSiteToSiteSecure) {
- return true;
- }
-
- final Map<String, String> userContext;
- if (user.getClientAddress() != null && !user.getClientAddress().trim().isEmpty()) {
- userContext = new HashMap<>();
- userContext.put(UserContextKeys.CLIENT_ADDRESS.name(), user.getClientAddress());
- } else {
- userContext = null;
- }
-
- final AuthorizationRequest request = new AuthorizationRequest.Builder()
- .resource(ResourceFactory.getDataTransferResource(port.getResource()))
- .identity(user.getIdentity())
- .groups(user.getGroups())
- .anonymous(user.isAnonymous())
- .accessAttempt(false)
- .action(RequestAction.WRITE)
- .userContext(userContext)
- .explanationSupplier(() -> "Unable to retrieve port details.")
- .build();
-
- final AuthorizationResult result = authorizer.authorize(request);
- return Result.Approved.equals(result.getResult());
- }
-
- @Override
- public ControllerDTO getSiteToSiteDetails() {
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
- if (user == null) {
- throw new WebApplicationException(new Throwable("Unable to access details for current user."));
- }
-
- // serialize the input ports this NiFi has access to
- final Set<PortDTO> inputPortDtos = new LinkedHashSet<>();
- final Set<RootGroupPort> inputPorts = controllerFacade.getInputPorts();
- for (final RootGroupPort inputPort : inputPorts) {
- if (isUserAuthorized(user, inputPort)) {
- final PortDTO dto = new PortDTO();
- dto.setId(inputPort.getIdentifier());
- dto.setName(inputPort.getName());
- dto.setComments(inputPort.getComments());
- dto.setState(inputPort.getScheduledState().toString());
- inputPortDtos.add(dto);
- }
- }
-
- // serialize the output ports this NiFi has access to
- final Set<PortDTO> outputPortDtos = new LinkedHashSet<>();
- for (final RootGroupPort outputPort : controllerFacade.getOutputPorts()) {
- if (isUserAuthorized(user, outputPort)) {
- final PortDTO dto = new PortDTO();
- dto.setId(outputPort.getIdentifier());
- dto.setName(outputPort.getName());
- dto.setComments(outputPort.getComments());
- dto.setState(outputPort.getScheduledState().toString());
- outputPortDtos.add(dto);
- }
- }
-
- // get the root group
- final ProcessGroup rootGroup = processGroupDAO.getProcessGroup(controllerFacade.getRootGroupId());
- final ProcessGroupCounts counts = rootGroup.getCounts();
-
- // create the controller dto
- final ControllerDTO controllerDTO = new ControllerDTO();
- controllerDTO.setId(controllerFacade.getRootGroupId());
- controllerDTO.setInstanceId(controllerFacade.getInstanceId());
- controllerDTO.setName(controllerFacade.getName());
- controllerDTO.setComments(controllerFacade.getComments());
- controllerDTO.setInputPorts(inputPortDtos);
- controllerDTO.setOutputPorts(outputPortDtos);
- controllerDTO.setInputPortCount(inputPortDtos.size());
- controllerDTO.setOutputPortCount(outputPortDtos.size());
- controllerDTO.setRunningCount(counts.getRunningCount());
- controllerDTO.setStoppedCount(counts.getStoppedCount());
- controllerDTO.setInvalidCount(counts.getInvalidCount());
- controllerDTO.setDisabledCount(counts.getDisabledCount());
-
- // determine the site to site configuration
- controllerDTO.setRemoteSiteListeningPort(controllerFacade.getRemoteSiteListeningPort());
- controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getRemoteSiteListeningHttpPort());
- controllerDTO.setSiteToSiteSecure(controllerFacade.isRemoteSiteCommsSecure());
-
- return controllerDTO;
- }
-
- @Override
- public ControllerConfigurationEntity getControllerConfiguration() {
- final Revision rev = revisionManager.getRevision(FlowController.class.getSimpleName());
- final ControllerConfigurationDTO dto = dtoFactory.createControllerConfigurationDto(controllerFacade);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerFacade);
- final RevisionDTO revision = dtoFactory.createRevisionDTO(rev);
- return entityFactory.createControllerConfigurationEntity(dto, revision, permissions);
- }
-
- @Override
- public ControllerBulletinsEntity getControllerBulletins() {
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
- final ControllerBulletinsEntity controllerBulletinsEntity = new ControllerBulletinsEntity();
-
- final List<BulletinEntity> controllerBulletinEntities = new ArrayList<>();
-
- final Authorizable controllerAuthorizable = authorizableLookup.getController();
- final boolean authorized = controllerAuthorizable.isAuthorized(authorizer, RequestAction.READ, user);
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController());
- controllerBulletinEntities.addAll(bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, authorized)).collect(Collectors.toList()));
-
- // get the controller service bulletins
- final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build();
- final List<Bulletin> allControllerServiceBulletins = bulletinRepository.findBulletins(controllerServiceQuery);
- final List<BulletinEntity> controllerServiceBulletinEntities = new ArrayList<>();
- for (final Bulletin bulletin : allControllerServiceBulletins) {
- try {
- final Authorizable controllerServiceAuthorizable = authorizableLookup.getControllerService(bulletin.getSourceId()).getAuthorizable();
- final boolean controllerServiceAuthorized = controllerServiceAuthorizable.isAuthorized(authorizer, RequestAction.READ, user);
-
- final BulletinEntity controllerServiceBulletin = entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), controllerServiceAuthorized);
- controllerServiceBulletinEntities.add(controllerServiceBulletin);
- controllerBulletinEntities.add(controllerServiceBulletin);
- } catch (final ResourceNotFoundException e) {
- // controller service missing.. skip
- }
- }
- controllerBulletinsEntity.setControllerServiceBulletins(controllerServiceBulletinEntities);
-
- // get the reporting task bulletins
- final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build();
- final List<Bulletin> allReportingTaskBulletins = bulletinRepository.findBulletins(reportingTaskQuery);
- final List<BulletinEntity> reportingTaskBulletinEntities = new ArrayList<>();
- for (final Bulletin bulletin : allReportingTaskBulletins) {
- try {
- final Authorizable reportingTaskAuthorizable = authorizableLookup.getReportingTask(bulletin.getSourceId()).getAuthorizable();
- final boolean reportingTaskAuthorizableAuthorized = reportingTaskAuthorizable.isAuthorized(authorizer, RequestAction.READ, user);
-
- final BulletinEntity reportingTaskBulletin = entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), reportingTaskAuthorizableAuthorized);
- reportingTaskBulletinEntities.add(reportingTaskBulletin);
- controllerBulletinEntities.add(reportingTaskBulletin);
- } catch (final ResourceNotFoundException e) {
- // reporting task missing.. skip
- }
- }
- controllerBulletinsEntity.setReportingTaskBulletins(reportingTaskBulletinEntities);
-
- controllerBulletinsEntity.setBulletins(pruneAndSortBulletins(controllerBulletinEntities, BulletinRepository.MAX_BULLETINS_FOR_CONTROLLER));
- return controllerBulletinsEntity;
- }
-
- @Override
- public FlowConfigurationEntity getFlowConfiguration() {
- final FlowConfigurationDTO dto = dtoFactory.createFlowConfigurationDto(properties.getAutoRefreshInterval(),
- properties.getDefaultBackPressureObjectThreshold(), properties.getDefaultBackPressureDataSizeThreshold(),properties.getDcaeDistributorApiHostname());
- final FlowConfigurationEntity entity = new FlowConfigurationEntity();
- entity.setFlowConfiguration(dto);
- return entity;
- }
-
- @Override
- public AccessPolicyEntity getAccessPolicy(final String accessPolicyId) {
- final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(accessPolicyId);
- return createAccessPolicyEntity(accessPolicy);
- }
-
- @Override
- public AccessPolicyEntity getAccessPolicy(final RequestAction requestAction, final String resource) {
- Authorizable authorizable;
- try {
- authorizable = authorizableLookup.getAuthorizableFromResource(resource);
- } catch (final ResourceNotFoundException e) {
- // unable to find the underlying authorizable... user authorized based on top level /policies... create
- // an anonymous authorizable to attempt to locate an existing policy for this resource
- authorizable = new Authorizable() {
- @Override
- public Authorizable getParentAuthorizable() {
- return null;
- }
-
- @Override
- public Resource getResource() {
- return new Resource() {
- @Override
- public String getIdentifier() {
- return resource;
- }
-
- @Override
- public String getName() {
- return resource;
- }
-
- @Override
- public String getSafeDescription() {
- return "Policy " + resource;
- }
- };
- }
- };
- }
-
- final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(requestAction, authorizable);
- return createAccessPolicyEntity(accessPolicy);
- }
-
- private AccessPolicyEntity createAccessPolicyEntity(final AccessPolicy accessPolicy) {
- final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(accessPolicy.getIdentifier()));
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicy.getIdentifier()));
- final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource());
- return entityFactory.createAccessPolicyEntity(
- dtoFactory.createAccessPolicyDto(accessPolicy,
- accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()),
- accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()), componentReference),
- revision, permissions);
- }
-
- @Override
- public UserEntity getUser(final String userId) {
- final User user = userDAO.getUser(userId);
- return createUserEntity(user, true);
- }
-
- @Override
- public Set<UserEntity> getUsers() {
- final Set<User> users = userDAO.getUsers();
- return users.stream()
- .map(user -> createUserEntity(user, false))
- .collect(Collectors.toSet());
- }
-
- private UserEntity createUserEntity(final User user, final boolean enforceUserExistence) {
- final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(user.getIdentifier()));
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
- final Set<TenantEntity> userGroups = userGroupDAO.getUserGroupsForUser(user.getIdentifier()).stream()
- .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(enforceUserExistence)).collect(Collectors.toSet());
- final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUser(user.getIdentifier()).stream()
- .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
- return entityFactory.createUserEntity(dtoFactory.createUserDto(user, userGroups, policyEntities), userRevision, permissions);
- }
-
- private UserGroupEntity createUserGroupEntity(final Group userGroup, final boolean enforceGroupExistence) {
- final RevisionDTO userGroupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userGroup.getIdentifier()));
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
- final Set<TenantEntity> users = userGroup.getUsers().stream().map(mapUserIdToTenantEntity(enforceGroupExistence)).collect(Collectors.toSet());
- final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(userGroup.getIdentifier()).stream()
- .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
- return entityFactory.createUserGroupEntity(dtoFactory.createUserGroupDto(userGroup, users, policyEntities), userGroupRevision, permissions);
- }
-
- @Override
- public UserGroupEntity getUserGroup(final String userGroupId) {
- final Group userGroup = userGroupDAO.getUserGroup(userGroupId);
- return createUserGroupEntity(userGroup, true);
- }
-
- @Override
- public Set<UserGroupEntity> getUserGroups() {
- final Set<Group> userGroups = userGroupDAO.getUserGroups();
- return userGroups.stream()
- .map(userGroup -> createUserGroupEntity(userGroup, false))
- .collect(Collectors.toSet());
- }
-
- private LabelEntity createLabelEntity(final Label label) {
- final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(label.getIdentifier()));
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label);
- return entityFactory.createLabelEntity(dtoFactory.createLabelDto(label), revision, permissions);
- }
-
- @Override
- public Set<LabelEntity> getLabels(final String groupId) {
- final Set<Label> labels = labelDAO.getLabels(groupId);
- return labels.stream()
- .map(label -> createLabelEntity(label))
- .collect(Collectors.toSet());
- }
-
- @Override
- public LabelEntity getLabel(final String labelId) {
- final Label label = labelDAO.getLabel(labelId);
- return createLabelEntity(label);
- }
-
- private FunnelEntity createFunnelEntity(final Funnel funnel) {
- final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(funnel.getIdentifier()));
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnel);
- return entityFactory.createFunnelEntity(dtoFactory.createFunnelDto(funnel), revision, permissions);
- }
-
- @Override
- public Set<FunnelEntity> getFunnels(final String groupId) {
- final Set<Funnel> funnels = funnelDAO.getFunnels(groupId);
- return funnels.stream()
- .map(funnel -> createFunnelEntity(funnel))
- .collect(Collectors.toSet());
- }
-
- @Override
- public FunnelEntity getFunnel(final String funnelId) {
- final Funnel funnel = funnelDAO.getFunnel(funnelId);
- return createFunnelEntity(funnel);
- }
-
- private PortEntity createInputPortEntity(final Port port) {
- final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(port.getIdentifier()));
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port, NiFiUserUtils.getNiFiUser());
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port), NiFiUserUtils.getNiFiUser());
- final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(port.getIdentifier()));
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier()));
- final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
- return entityFactory.createPortEntity(dtoFactory.createPortDto(port), revision, permissions, operatePermissions, status, bulletinEntities);
- }
-
- private PortEntity createOutputPortEntity(final Port port) {
- final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(port.getIdentifier()));
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port, NiFiUserUtils.getNiFiUser());
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port), NiFiUserUtils.getNiFiUser());
- final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(port.getIdentifier()));
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier()));
- final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
- return entityFactory.createPortEntity(dtoFactory.createPortDto(port), revision, permissions, operatePermissions, status, bulletinEntities);
- }
-
- @Override
- public Set<PortEntity> getInputPorts(final String groupId) {
- final Set<Port> inputPorts = inputPortDAO.getPorts(groupId);
- return inputPorts.stream()
- .map(port -> createInputPortEntity(port))
- .collect(Collectors.toSet());
- }
-
- @Override
- public Set<PortEntity> getOutputPorts(final String groupId) {
- final Set<Port> ports = outputPortDAO.getPorts(groupId);
- return ports.stream()
- .map(port -> createOutputPortEntity(port))
- .collect(Collectors.toSet());
- }
-
- private ProcessGroupEntity createProcessGroupEntity(final ProcessGroup group) {
- final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(group.getIdentifier()));
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(group);
- final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(group.getIdentifier()));
- final List<BulletinEntity> bulletins = getProcessGroupBulletins(group);
- return entityFactory.createProcessGroupEntity(dtoFactory.createProcessGroupDto(group), revision, permissions, status, bulletins);
- }
-
- private List<BulletinEntity> getProcessGroupBulletins(final ProcessGroup group) {
- final List<Bulletin> bulletins = new ArrayList<>(bulletinRepository.findBulletinsForGroupBySource(group.getIdentifier()));
-
- for (final ProcessGroup descendantGroup : group.findAllProcessGroups()) {
- bulletins.addAll(bulletinRepository.findBulletinsForGroupBySource(descendantGroup.getIdentifier()));
- }
-
- List<BulletinEntity> bulletinEntities = new ArrayList<>();
- for (final Bulletin bulletin : bulletins) {
- bulletinEntities.add(entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), authorizeBulletin(bulletin)));
- }
-
- return pruneAndSortBulletins(bulletinEntities, BulletinRepository.MAX_BULLETINS_PER_COMPONENT);
- }
-
- private List<BulletinEntity> pruneAndSortBulletins(final List<BulletinEntity> bulletinEntities, final int maxBulletins) {
- // sort the bulletins
- Collections.sort(bulletinEntities, new Comparator<BulletinEntity>() {
- @Override
- public int compare(BulletinEntity o1, BulletinEntity o2) {
- if (o1 == null && o2 == null) {
- return 0;
- }
- if (o1 == null) {
- return 1;
- }
- if (o2 == null) {
- return -1;
- }
-
- return -Long.compare(o1.getId(), o2.getId());
- }
- });
-
- // prune the response to only include the max number of bulletins
- if (bulletinEntities.size() > maxBulletins) {
- return bulletinEntities.subList(0, maxBulletins);
- } else {
- return bulletinEntities;
- }
- }
-
- @Override
- public Set<ProcessGroupEntity> getProcessGroups(final String parentGroupId) {
- final Set<ProcessGroup> groups = processGroupDAO.getProcessGroups(parentGroupId);
- return groups.stream()
- .map(group -> createProcessGroupEntity(group))
- .collect(Collectors.toSet());
- }
-
- private RemoteProcessGroupEntity createRemoteGroupEntity(final RemoteProcessGroup rpg, final NiFiUser user) {
- final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(rpg.getIdentifier()));
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(rpg, user);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(rpg), user);
- final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(rpg, controllerFacade.getRemoteProcessGroupStatus(rpg.getIdentifier()));
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(rpg.getIdentifier()));
- final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
- return entityFactory.createRemoteProcessGroupEntity(dtoFactory.createRemoteProcessGroupDto(rpg), revision, permissions, operatePermissions, status, bulletinEntities);
- }
-
- @Override
- public Set<RemoteProcessGroupEntity> getRemoteProcessGroups(final String groupId) {
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
- final Set<RemoteProcessGroup> rpgs = remoteProcessGroupDAO.getRemoteProcessGroups(groupId);
- return rpgs.stream()
- .map(rpg -> createRemoteGroupEntity(rpg, user))
- .collect(Collectors.toSet());
- }
-
- @Override
- public PortEntity getInputPort(final String inputPortId) {
- final Port port = inputPortDAO.getPort(inputPortId);
- return createInputPortEntity(port);
- }
-
- @Override
- public PortStatusEntity getInputPortStatus(final String inputPortId) {
- final Port inputPort = inputPortDAO.getPort(inputPortId);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(inputPort);
- final PortStatusDTO dto = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(inputPortId));
- return entityFactory.createPortStatusEntity(dto, permissions);
- }
-
- @Override
- public PortEntity getOutputPort(final String outputPortId) {
- final Port port = outputPortDAO.getPort(outputPortId);
- return createOutputPortEntity(port);
- }
-
- @Override
- public PortStatusEntity getOutputPortStatus(final String outputPortId) {
- final Port outputPort = outputPortDAO.getPort(outputPortId);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(outputPort);
- final PortStatusDTO dto = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(outputPortId));
- return entityFactory.createPortStatusEntity(dto, permissions);
- }
-
- @Override
- public RemoteProcessGroupEntity getRemoteProcessGroup(final String remoteProcessGroupId) {
- final RemoteProcessGroup rpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
- return createRemoteGroupEntity(rpg, NiFiUserUtils.getNiFiUser());
- }
-
- @Override
- public RemoteProcessGroupStatusEntity getRemoteProcessGroupStatus(final String id) {
- final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(id);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup);
- final RemoteProcessGroupStatusDTO dto = dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroup, controllerFacade.getRemoteProcessGroupStatus(id));
- return entityFactory.createRemoteProcessGroupStatusEntity(dto, permissions);
- }
-
- @Override
- public StatusHistoryEntity getRemoteProcessGroupStatusHistory(final String id) {
- final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(id);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup);
- final StatusHistoryDTO dto = controllerFacade.getRemoteProcessGroupStatusHistory(id);
- return entityFactory.createStatusHistoryEntity(dto, permissions);
- }
-
- @Override
- public CurrentUserEntity getCurrentUser() {
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
- final CurrentUserEntity entity = new CurrentUserEntity();
- entity.setIdentity(user.getIdentity());
- entity.setAnonymous(user.isAnonymous());
- entity.setProvenancePermissions(dtoFactory.createPermissionsDto(authorizableLookup.getProvenance()));
- entity.setCountersPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getCounters()));
- entity.setTenantsPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getTenant()));
- entity.setControllerPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getController()));
- entity.setPoliciesPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getPolicies()));
- entity.setSystemPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getSystem()));
- entity.setCanVersionFlows(CollectionUtils.isNotEmpty(flowRegistryClient.getRegistryIdentifiers()));
-
- entity.setRestrictedComponentsPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getRestrictedComponents()));
-
- final Set<ComponentRestrictionPermissionDTO> componentRestrictionPermissions = new HashSet<>();
- Arrays.stream(RequiredPermission.values()).forEach(requiredPermission -> {
- final PermissionsDTO restrictionPermissions = dtoFactory.createPermissionsDto(authorizableLookup.getRestrictedComponents(requiredPermission));
-
- final RequiredPermissionDTO requiredPermissionDto = new RequiredPermissionDTO();
- requiredPermissionDto.setId(requiredPermission.getPermissionIdentifier());
- requiredPermissionDto.setLabel(requiredPermission.getPermissionLabel());
-
- final ComponentRestrictionPermissionDTO componentRestrictionPermissionDto = new ComponentRestrictionPermissionDTO();
- componentRestrictionPermissionDto.setRequiredPermission(requiredPermissionDto);
- componentRestrictionPermissionDto.setPermissions(restrictionPermissions);
-
- componentRestrictionPermissions.add(componentRestrictionPermissionDto);
- });
- entity.setComponentRestrictionPermissions(componentRestrictionPermissions);
-
- return entity;
- }
-
- @Override
- public ProcessGroupFlowEntity getProcessGroupFlow(final String groupId) {
- final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
-
- // Get the Process Group Status but we only need a status depth of one because for any child process group,
- // we ignore the status of each individual components. I.e., if Process Group A has child Group B, and child Group B
- // has a Processor, we don't care about the individual stats of that Processor because the ProcessGroupFlowEntity
- // doesn't include that anyway. So we can avoid including the information in the status that is returned.
- final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId, 1);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
- return entityFactory.createProcessGroupFlowEntity(dtoFactory.createProcessGroupFlowDto(processGroup, groupStatus, revisionManager, this::getProcessGroupBulletins), permissions);
- }
-
- @Override
- public ProcessGroupEntity getProcessGroup(final String groupId) {
- final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
- return createProcessGroupEntity(processGroup);
- }
-
- private ControllerServiceEntity createControllerServiceEntity(final ControllerServiceNode serviceNode, final Set<String> serviceIds) {
- final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(serviceNode);
-
- final ControllerServiceReference ref = serviceNode.getReferences();
- final ControllerServiceReferencingComponentsEntity referencingComponentsEntity = createControllerServiceReferencingComponentsEntity(ref, serviceIds);
- dto.setReferencingComponents(referencingComponentsEntity.getControllerServiceReferencingComponents());
-
- final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(serviceNode.getIdentifier()));
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(serviceNode, NiFiUserUtils.getNiFiUser());
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(serviceNode), NiFiUserUtils.getNiFiUser());
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(serviceNode.getIdentifier()));
- final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
- return entityFactory.createControllerServiceEntity(dto, revision, permissions, operatePermissions, bulletinEntities);
- }
-
- @Override
- public VariableRegistryEntity getVariableRegistry(final String groupId, final boolean includeAncestorGroups) {
- final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
- if (processGroup == null) {
- throw new ResourceNotFoundException("Could not find group with ID " + groupId);
- }
-
- return createVariableRegistryEntity(processGroup, includeAncestorGroups);
- }
-
- private VariableRegistryEntity createVariableRegistryEntity(final ProcessGroup processGroup, final boolean includeAncestorGroups) {
- final VariableRegistryDTO registryDto = dtoFactory.createVariableRegistryDto(processGroup, revisionManager);
- final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processGroup.getIdentifier()));
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
-
- if (includeAncestorGroups) {
- ProcessGroup parent = processGroup.getParent();
- while (parent != null) {
- final PermissionsDTO parentPerms = dtoFactory.createPermissionsDto(parent);
- if (Boolean.TRUE.equals(parentPerms.getCanRead())) {
- final VariableRegistryDTO parentRegistryDto = dtoFactory.createVariableRegistryDto(parent, revisionManager);
- final Set<VariableEntity> parentVariables = parentRegistryDto.getVariables();
- registryDto.getVariables().addAll(parentVariables);
- }
-
- parent = parent.getParent();
- }
- }
-
- return entityFactory.createVariableRegistryEntity(registryDto, revision, permissions);
- }
-
- @Override
- public VariableRegistryEntity populateAffectedComponents(final VariableRegistryDTO variableRegistryDto) {
- final String groupId = variableRegistryDto.getProcessGroupId();
- final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
- if (processGroup == null) {
- throw new ResourceNotFoundException("Could not find group with ID " + groupId);
- }
-
- final VariableRegistryDTO registryDto = dtoFactory.populateAffectedComponents(variableRegistryDto, processGroup, revisionManager);
- final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processGroup.getIdentifier()));
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
- return entityFactory.createVariableRegistryEntity(registryDto, revision, permissions);
- }
-
- @Override
- public Set<ControllerServiceEntity> getControllerServices(final String groupId, final boolean includeAncestorGroups, final boolean includeDescendantGroups) {
- final Set<ControllerServiceNode> serviceNodes = controllerServiceDAO.getControllerServices(groupId, includeAncestorGroups, includeDescendantGroups);
- final Set<String> serviceIds = serviceNodes.stream().map(service -> service.getIdentifier()).collect(Collectors.toSet());
-
- return serviceNodes.stream()
- .map(serviceNode -> createControllerServiceEntity(serviceNode, serviceIds))
- .collect(Collectors.toSet());
- }
-
- @Override
- public ControllerServiceEntity getControllerService(final String controllerServiceId) {
- final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId);
- return createControllerServiceEntity(controllerService, Sets.newHashSet(controllerServiceId));
- }
-
- @Override
- public PropertyDescriptorDTO getControllerServicePropertyDescriptor(final String id, final String property) {
- final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(id);
- PropertyDescriptor descriptor = controllerService.getControllerServiceImplementation().getPropertyDescriptor(property);
-
- // return an invalid descriptor if the controller service doesn't support this property
- if (descriptor == null) {
- descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
- }
-
- final String groupId = controllerService.getProcessGroup() == null ? null : controllerService.getProcessGroup().getIdentifier();
- return dtoFactory.createPropertyDescriptorDto(descriptor, groupId);
- }
-
- @Override
- public ControllerServiceReferencingComponentsEntity getControllerServiceReferencingComponents(final String controllerServiceId) {
- final ControllerServiceNode service = controllerServiceDAO.getControllerService(controllerServiceId);
- final ControllerServiceReference ref = service.getReferences();
- return createControllerServiceReferencingComponentsEntity(ref, Sets.newHashSet(controllerServiceId));
- }
-
- private ReportingTaskEntity createReportingTaskEntity(final ReportingTaskNode reportingTask) {
- final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(reportingTask.getIdentifier()));
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask);
- final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask));
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier()));
- final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
- return entityFactory.createReportingTaskEntity(dtoFactory.createReportingTaskDto(reportingTask), revision, permissions, operatePermissions, bulletinEntities);
- }
-
- @Override
- public Set<ReportingTaskEntity> getReportingTasks() {
- final Set<ReportingTaskNode> reportingTasks = reportingTaskDAO.getReportingTasks();
- return reportingTasks.stream()
- .map(reportingTask -> createReportingTaskEntity(reportingTask))
- .collect(Collectors.toSet());
- }
-
- @Override
- public ReportingTaskEntity getReportingTask(final String reportingTaskId) {
- final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId);
- return createReportingTaskEntity(reportingTask);
- }
-
- @Override
- public PropertyDescriptorDTO getReportingTaskPropertyDescriptor(final String id, final String property) {
- final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(id);
- PropertyDescriptor descriptor = reportingTask.getReportingTask().getPropertyDescriptor(property);
-
- // return an invalid descriptor if the reporting task doesn't support this property
- if (descriptor == null) {
- descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
- }
-
- return dtoFactory.createPropertyDescriptorDto(descriptor, null);
- }
-
- @Override
- public StatusHistoryEntity getProcessGroupStatusHistory(final String groupId) {
- final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
- final StatusHistoryDTO dto = controllerFacade.getProcessGroupStatusHistory(groupId);
- return entityFactory.createStatusHistoryEntity(dto, permissions);
- }
-
- @Override
- public VersionControlComponentMappingEntity registerFlowWithFlowRegistry(final String groupId, final StartVersionControlRequestEntity requestEntity) {
- final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
-
- final VersionControlInformation currentVci = processGroup.getVersionControlInformation();
- final int expectedVersion = currentVci == null ? 1 : currentVci.getVersion() + 1;
-
- // Create a VersionedProcessGroup snapshot of the flow as it is currently.
- final InstantiatedVersionedProcessGroup versionedProcessGroup = createFlowSnapshot(groupId);
-
- final VersionedFlowDTO versionedFlowDto = requestEntity.getVersionedFlow();
- final String flowId = versionedFlowDto.getFlowId() == null ? UUID.randomUUID().toString() : versionedFlowDto.getFlowId();
-
- final VersionedFlow versionedFlow = new VersionedFlow();
- versionedFlow.setBucketIdentifier(versionedFlowDto.getBucketId());
- versionedFlow.setCreatedTimestamp(System.currentTimeMillis());
- versionedFlow.setDescription(versionedFlowDto.getDescription());
- versionedFlow.setModifiedTimestamp(versionedFlow.getCreatedTimestamp());
- versionedFlow.setName(versionedFlowDto.getFlowName());
- versionedFlow.setIdentifier(flowId);
-
- // Add the Versioned Flow and first snapshot to the Flow Registry
- final String registryId = requestEntity.getVersionedFlow().getRegistryId();
- final VersionedFlowSnapshot registeredSnapshot;
- final VersionedFlow registeredFlow;
-
- String action = "create the flow";
- try {
- // first, create the flow in the registry, if necessary
- if (versionedFlowDto.getFlowId() == null) {
- registeredFlow = registerVersionedFlow(registryId, versionedFlow);
- } else {
- registeredFlow = getVersionedFlow(registryId, versionedFlowDto.getBucketId(), versionedFlowDto.getFlowId());
- }
-
- action = "add the local flow to the Flow Registry as the first Snapshot";
-
- // add first snapshot to the flow in the registry
- registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, versionedFlowDto.getComments(), expectedVersion);
- } catch (final NiFiRegistryException e) {
- throw new IllegalArgumentException(e.getLocalizedMessage());
- } catch (final IOException ioe) {
- throw new IllegalStateException("Failed to communicate with Flow Registry when attempting to " + action);
- }
-
- final Bucket bucket = registeredSnapshot.getBucket();
- final VersionedFlow flow = registeredSnapshot.getFlow();
-
- // Update the Process Group with the new VersionControlInformation. (Send this to all nodes).
- final VersionControlInformationDTO vci = new VersionControlInformationDTO();
- vci.setBucketId(bucket.getIdentifier());
- vci.setBucketName(bucket.getName());
- vci.setFlowId(flow.getIdentifier());
- vci.setFlowName(flow.getName());
- vci.setFlowDescription(flow.getDescription());
- vci.setGroupId(groupId);
- vci.setRegistryId(registryId);
- vci.setRegistryName(getFlowRegistryName(registryId));
- vci.setVersion(registeredSnapshot.getSnapshotMetadata().getVersion());
- vci.setState(VersionedFlowState.UP_TO_DATE.name());
-
- final Map<String, String> mapping = dtoFactory.createVersionControlComponentMappingDto(versionedProcessGroup);
-
- final Revision groupRevision = revisionManager.getRevision(groupId);
- final RevisionDTO groupRevisionDto = dtoFactory.createRevisionDTO(groupRevision);
-
- final VersionControlComponentMappingEntity entity = new VersionControlComponentMappingEntity();
- entity.setVersionControlInformation(vci);
- entity.setProcessGroupRevision(groupRevisionDto);
- entity.setVersionControlComponentMapping(mapping);
- return entity;
- }
-
- @Override
- public VersionedFlow deleteVersionedFlow(final String registryId, final String bucketId, final String flowId) {
- final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
- if (registry == null) {
- throw new IllegalArgumentException("No Flow Registry exists with ID " + registryId);
- }
-
- try {
- return registry.deleteVersionedFlow(bucketId, flowId, NiFiUserUtils.getNiFiUser());
- } catch (final IOException | NiFiRegistryException e) {
- throw new NiFiCoreException("Failed to remove flow from Flow Registry due to " + e.getMessage(), e);
- }
- }
-
- @Override
- public VersionControlInformationEntity getVersionControlInformation(final String groupId) {
- final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
- final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();
- if (versionControlInfo == null) {
- return null;
- }
-
- final VersionControlInformationDTO versionControlDto = dtoFactory.createVersionControlInformationDto(processGroup);
- final RevisionDTO groupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(groupId));
- return entityFactory.createVersionControlInformationEntity(versionControlDto, groupRevision);
- }
-
- private InstantiatedVersionedProcessGroup createFlowSnapshot(final String processGroupId) {
- final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
- final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
- final InstantiatedVersionedProcessGroup versionedGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, false);
- return versionedGroup;
- }
-
- @Override
- public FlowComparisonEntity getLocalModifications(final String processGroupId) {
- final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
- final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();
- if (versionControlInfo == null) {
- throw new IllegalStateException("Process Group with ID " + processGroupId + " is not under Version Control");
- }
-
- final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryIdentifier());
- if (flowRegistry == null) {
- throw new IllegalStateException("Process Group with ID " + processGroupId + " is tracking to a flow in Flow Registry with ID " + versionControlInfo.getRegistryIdentifier()
- + " but cannot find a Flow Registry with that identifier");
- }
-
- final VersionedFlowSnapshot versionedFlowSnapshot;
- try {
- versionedFlowSnapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketIdentifier(),
- versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), true, NiFiUserUtils.getNiFiUser());
- } catch (final IOException | NiFiRegistryException e) {
- throw new NiFiCoreException("Failed to retrieve flow with Flow Registry in order to calculate local differences due to " + e.getMessage(), e);
- }
-
- final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
- final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, true);
- final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents();
-
- final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup);
- final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", registryGroup);
-
- final Set<String> ancestorServiceIds = getAncestorGroupServiceIds(processGroup);
- final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, new ConciseEvolvingDifferenceDescriptor());
- final FlowComparison flowComparison = flowComparator.compare();
-
- final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison);
-
- final FlowComparisonEntity entity = new FlowComparisonEntity();
- entity.setComponentDifferences(differenceDtos);
- return entity;
- }
-
- private Set<String> getAncestorGroupServiceIds(final ProcessGroup group) {
- final Set<String> ancestorServiceIds;
- ProcessGroup parentGroup = group.getParent();
-
- if (parentGroup == null) {
- ancestorServiceIds = Collections.emptySet();
- } else {
- ancestorServiceIds = parentGroup.getControllerServices(true).stream()
- .map(cs -> {
- // We want to map the Controller Service to its Versioned Component ID, if it has one.
- // If it does not have one, we want to generate it in the same way that our Flow Mapper does
- // because this allows us to find the Controller Service when doing a Flow Diff.
- final Optional<String> versionedId = cs.getVersionedComponentId();
- if (versionedId.isPresent()) {
- return versionedId.get();
- }
-
- return UUID.nameUUIDFromBytes(cs.getIdentifier().getBytes(StandardCharsets.UTF_8)).toString();
- })
- .collect(Collectors.toSet());
- }
-
- return ancestorServiceIds;
- }
-
- @Override
- public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) {
- final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
- if (registry == null) {
- throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
- }
-
- try {
- return registry.registerVersionedFlow(flow, NiFiUserUtils.getNiFiUser());
- } catch (final IOException | NiFiRegistryException e) {
- throw new NiFiCoreException("Failed to register flow with Flow Registry due to " + e.getMessage(), e);
- }
- }
-
- private VersionedFlow getVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
- final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
- if (registry == null) {
- throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
- }
-
- return registry.getVersionedFlow(bucketId, flowId, NiFiUserUtils.getNiFiUser());
- }
-
- @Override
- public VersionedFlowSnapshot registerVersionedFlowSnapshot(final String registryId, final VersionedFlow flow,
- final VersionedProcessGroup snapshot, final String comments, final int expectedVersion) {
- final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
- if (registry == null) {
- throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
- }
-
- try {
- return registry.registerVersionedFlowSnapshot(flow, snapshot, comments, expectedVersion, NiFiUserUtils.getNiFiUser());
- } catch (final IOException | NiFiRegistryException e) {
- throw new NiFiCoreException("Failed to register flow with Flow Registry due to " + e.getMessage(), e);
- }
- }
-
- @Override
- public VersionControlInformationEntity setVersionControlInformation(final Revision revision, final String processGroupId,
- final VersionControlInformationDTO versionControlInfo, final Map<String, String> versionedComponentMapping) {
-
- final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
-
- final RevisionUpdate<VersionControlInformationDTO> snapshot = updateComponent(revision,
- group,
- () -> processGroupDAO.updateVersionControlInformation(versionControlInfo, versionedComponentMapping),
- processGroup -> dtoFactory.createVersionControlInformationDto(processGroup));
-
- return entityFactory.createVersionControlInformationEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()));
- }
-
- @Override
- public VersionControlInformationEntity deleteVersionControl(final Revision revision, final String processGroupId) {
- final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
-
- final RevisionUpdate<VersionControlInformationDTO> snapshot = updateComponent(revision,
- group,
- () -> processGroupDAO.disconnectVersionControl(processGroupId),
- processGroup -> dtoFactory.createVersionControlInformationDto(group));
-
- return entityFactory.createVersionControlInformationEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()));
- }
-
- @Override
- public void verifyCanUpdate(final String groupId, final VersionedFlowSnapshot proposedFlow, final boolean verifyConnectionRemoval, final boolean verifyNotDirty) {
- final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
- group.verifyCanUpdate(proposedFlow, verifyConnectionRemoval, verifyNotDirty);
- }
-
- @Override
- public void verifyCanSaveToFlowRegistry(final String groupId, final String registryId, final String bucketId, final String flowId) {
- final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
- group.verifyCanSaveToFlowRegistry(registryId, bucketId, flowId);
- }
-
- @Override
- public void verifyCanRevertLocalModifications(final String groupId, final VersionedFlowSnapshot versionedFlowSnapshot) {
- final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
- group.verifyCanRevertLocalModifications();
-
- // verify that the process group can be updated to the given snapshot. We do not verify that connections can
- // be removed, because the flow may still be running, and it only matters that the connections can be removed once the components
- // have been stopped.
- group.verifyCanUpdate(versionedFlowSnapshot, false, false);
- }
-
- @Override
- public Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(final String processGroupId, final VersionedFlowSnapshot updatedSnapshot) {
- final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
-
- final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
- final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, controllerFacade.getControllerServiceProvider(), flowRegistryClient, true);
-
- final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localContents);
- final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Versioned Flow", updatedSnapshot.getFlowContents());
-
- final Set<String> ancestorGroupServiceIds = getAncestorGroupServiceIds(group);
- final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, ancestorGroupServiceIds, new StaticDifferenceDescriptor());
- final FlowComparison comparison = flowComparator.compare();
-
- final Set<AffectedComponentEntity> affectedComponents = comparison.getDifferences().stream()
- .filter(difference -> difference.getDifferenceType() != DifferenceType.COMPONENT_ADDED) // components that are added are not components that will be affected in the local flow.
- .filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED)
- .filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
- .filter(FlowDifferenceFilters.FILTER_IGNORABLE_VERSIONED_FLOW_COORDINATE_CHANGES)
- .map(difference -> {
- final VersionedComponent localComponent = difference.getComponentA();
-
- final String state;
- switch (localComponent.getComponentType()) {
- case CONTROLLER_SERVICE:
- final String serviceId = ((InstantiatedVersionedControllerService) localComponent).getInstanceId();
- state = controllerServiceDAO.getControllerService(serviceId).getState().name();
- break;
- case PROCESSOR:
- final String processorId = ((InstantiatedVersionedProcessor) localComponent).getInstanceId();
- state = processorDAO.getProcessor(processorId).getPhysicalScheduledState().name();
- break;
- case REMOTE_INPUT_PORT:
- final InstantiatedVersionedRemoteGroupPort inputPort = (InstantiatedVersionedRemoteGroupPort) localComponent;
- state = remoteProcessGroupDAO.getRemoteProcessGroup(inputPort.getInstanceGroupId()).getInputPort(inputPort.getInstanceId()).getScheduledState().name();
- break;
- case REMOTE_OUTPUT_PORT:
- final InstantiatedVersionedRemoteGroupPort outputPort = (InstantiatedVersionedRemoteGroupPort) localComponent;
- state = remoteProcessGroupDAO.getRemoteProcessGroup(outputPort.getInstanceGroupId()).getOutputPort(outputPort.getInstanceId()).getScheduledState().name();
- break;
- default:
- state = null;
- break;
- }
-
- return createAffectedComponentEntity((InstantiatedVersionedComponent) localComponent, localComponent.getComponentType().name(), state);
- })
- .collect(Collectors.toCollection(HashSet::new));
-
- for (final FlowDifference difference : comparison.getDifferences()) {
- // Ignore these as local differences for now because we can't do anything with it
- if (difference.getDifferenceType() == DifferenceType.BUNDLE_CHANGED) {
- continue;
- }
-
- // Ignore differences for adding remote ports
- if (FlowDifferenceFilters.isAddedOrRemovedRemotePort(difference)) {
- continue;
- }
-
- if (FlowDifferenceFilters.isIgnorableVersionedFlowCoordinateChange(difference)) {
- continue;
- }
-
- final VersionedComponent localComponent = difference.getComponentA();
- if (localComponent == null) {
- continue;
- }
-
- // If any Process Group is removed, consider all components below that Process Group as an affected component
- if (difference.getDifferenceType() == DifferenceType.COMPONENT_REMOVED && localComponent.getComponentType() == org.apache.nifi.registry.flow.ComponentType.PROCESS_GROUP) {
- final String localGroupId = ((InstantiatedVersionedProcessGroup) localComponent).getInstanceId();
- final ProcessGroup localGroup = processGroupDAO.getProcessGroup(localGroupId);
-
- localGroup.findAllProcessors().stream()
- .map(comp -> createAffectedComponentEntity(comp))
- .forEach(affectedComponents::add);
- localGroup.findAllFunnels().stream()
- .map(comp -> createAffectedComponentEntity(comp))
- .forEach(affectedComponents::add);
- localGroup.findAllInputPorts().stream()
- .map(comp -> createAffectedComponentEntity(comp))
- .forEach(affectedComponents::add);
- localGroup.findAllOutputPorts().stream()
- .map(comp -> createAffectedComponentEntity(comp))
- .forEach(affectedComponents::add);
- localGroup.findAllRemoteProcessGroups().stream()
- .flatMap(rpg -> Stream.concat(rpg.getInputPorts().stream(), rpg.getOutputPorts().stream()))
- .map(comp -> createAffectedComponentEntity(comp))
- .forEach(affectedComponents::add);
- localGroup.findAllControllerServices().stream()
- .map(comp -> createAffectedComponentEntity(comp))
- .forEach(affectedComponents::add);
- }
-
- if (localComponent.getComponentType() == org.apache.nifi.registry.flow.ComponentType.CONTROLLER_SERVICE) {
- final String serviceId = ((InstantiatedVersionedControllerService) localComponent).getInstanceId();
- final ControllerServiceNode serviceNode = controllerServiceDAO.getControllerService(serviceId);
-
- final List<ControllerServiceNode> referencingServices = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
- for (final ControllerServiceNode referencingService : referencingServices) {
- affectedComponents.add(createAffectedComponentEntity(referencingService));
- }
-
- final List<ProcessorNode> referencingProcessors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class);
- for (final ProcessorNode referencingProcessor : referencingProcessors) {
- affectedComponents.add(createAffectedComponentEntity(referencingProcessor));
- }
- }
- }
-
- // Create a map of all connectable components by versioned component ID to the connectable component itself
- final Map<String, List<Connectable>> connectablesByVersionId = new HashMap<>();
- mapToConnectableId(group.findAllFunnels(), connectablesByVersionId);
- mapToConnectableId(group.findAllInputPorts(), connectablesByVersionId);
- mapToConnectableId(group.findAllOutputPorts(), connectablesByVersionId);
- mapToConnectableId(group.findAllProcessors(), connectablesByVersionId);
-
- final List<RemoteGroupPort> remotePorts = new ArrayList<>();
- for (final RemoteProcessGroup rpg : group.findAllRemoteProcessGroups()) {
- remotePorts.addAll(rpg.getInputPorts());
- remotePorts.addAll(rpg.getOutputPorts());
- }
- mapToConnectableId(remotePorts, connectablesByVersionId);
-
- // If any connection is added or modified, we need to stop both the source (if it exists in the flow currently)
- // and the destination (if it exists in the flow currently).
- for (final FlowDifference difference : comparison.getDifferences()) {
- VersionedComponent component = difference.getComponentA();
- if (component == null) {
- component = difference.getComponentB();
- }
-
- if (component.getComponentType() != org.apache.nifi.registry.flow.ComponentType.CONNECTION) {
- continue;
- }
-
- final VersionedConnection connection = (VersionedConnection) component;
-
- final String sourceVersionedId = connection.getSource().getId();
- final List<Connectable> sources = connectablesByVersionId.get(sourceVersionedId);
- if (sources != null) {
- for (final Connectable source : sources) {
- affectedComponents.add(createAffectedComponentEntity(source));
- }
- }
-
- final String destinationVersionId = connection.getDestination().getId();
- final List<Connectable> destinations = connectablesByVersionId.get(destinationVersionId);
- if (destinations != null) {
- for (final Connectable destination : destinations) {
- affectedComponents.add(createAffectedComponentEntity(destination));
- }
- }
- }
-
- return affectedComponents;
- }
-
- private void mapToConnectableId(final Collection<? extends Connectable> connectables, final Map<String, List<Connectable>> destination) {
- for (final Connectable connectable : connectables) {
- final Optional<String> versionedIdOption = connectable.getVersionedComponentId();
-
- // Determine the Versioned ID by using the ID that is assigned, if one is. Otherwise,
- // we will calculate the Versioned ID. This allows us to map connectables that currently are not under
- // version control. We have to do this so that if we are changing flow versions and have a component that is running and it does not exist
- // in the Versioned Flow, we still need to be able to create an AffectedComponentDTO for it.
- final String versionedId;
- if (versionedIdOption.isPresent()) {
- versionedId = versionedIdOption.get();
- } else {
- versionedId = UUID.nameUUIDFromBytes(connectable.getIdentifier().getBytes(StandardCharsets.UTF_8)).toString();
- }
-
- final List<Connectable> byVersionedId = destination.computeIfAbsent(versionedId, key -> new ArrayList<>());
- byVersionedId.add(connectable);
- }
- }
-
-
- private AffectedComponentEntity createAffectedComponentEntity(final Connectable connectable) {
- final AffectedComponentEntity entity = new AffectedComponentEntity();
- entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(connectable.getIdentifier())));
- entity.setId(connectable.getIdentifier());
-
- final Authorizable authorizable = getAuthorizable(connectable);
- final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable);
- entity.setPermissions(permissionsDto);
-
- final AffectedComponentDTO dto = new AffectedComponentDTO();
- dto.setId(connectable.getIdentifier());
- dto.setReferenceType(connectable.getConnectableType().name());
- dto.setState(connectable.getScheduledState().name());
-
- final String groupId = connectable instanceof RemoteGroupPort ? ((RemoteGroupPort) connectable).getRemoteProcessGroup().getIdentifier() : connectable.getProcessGroupIdentifier();
- dto.setProcessGroupId(groupId);
-
- entity.setComponent(dto);
- return entity;
- }
-
- private AffectedComponentEntity createAffectedComponentEntity(final ControllerServiceNode serviceNode) {
- final AffectedComponentEntity entity = new AffectedComponentEntity();
- entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(serviceNode.getIdentifier())));
- entity.setId(serviceNode.getIdentifier());
-
- final Authorizable authorizable = authorizableLookup.getControllerService(serviceNode.getIdentifier()).getAuthorizable();
- final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable);
- entity.setPermissions(permissionsDto);
-
- final AffectedComponentDTO dto = new AffectedComponentDTO();
- dto.setId(serviceNode.getIdentifier());
- dto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
- dto.setProcessGroupId(serviceNode.getProcessGroupIdentifier());
- dto.setState(serviceNode.getState().name());
-
- entity.setComponent(dto);
- return entity;
- }
-
- private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedVersionedComponent instance, final String componentTypeName, final String componentState) {
- final AffectedComponentEntity entity = new AffectedComponentEntity();
- entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(instance.getInstanceId())));
- entity.setId(instance.getInstanceId());
-
- final Authorizable authorizable = getAuthorizable(componentTypeName, instance);
- final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable);
- entity.setPermissions(permissionsDto);
-
- final AffectedComponentDTO dto = new AffectedComponentDTO();
- dto.setId(instance.getInstanceId());
- dto.setReferenceType(componentTypeName);
- dto.setProcessGroupId(instance.getInstanceGroupId());
- dto.setState(componentState);
-
- entity.setComponent(dto);
- return entity;
- }
-
-
- private Authorizable getAuthorizable(final Connectable connectable) {
- switch (connectable.getConnectableType()) {
- case REMOTE_INPUT_PORT:
- case REMOTE_OUTPUT_PORT:
- final String rpgId = ((RemoteGroupPort) connectable).getRemoteProcessGroup().getIdentifier();
- return authorizableLookup.getRemoteProcessGroup(rpgId);
- default:
- return authorizableLookup.getLocalConnectable(connectable.getIdentifier());
- }
- }
-
- private Authorizable getAuthorizable(final String componentTypeName, final InstantiatedVersionedComponent versionedComponent) {
- final String componentId = versionedComponent.getInstanceId();
-
- if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.CONTROLLER_SERVICE.name())) {
- return authorizableLookup.getControllerService(componentId).getAuthorizable();
- }
-
- if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.CONNECTION.name())) {
- return authorizableLookup.getConnection(componentId).getAuthorizable();
- }
-
- if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.FUNNEL.name())) {
- return authorizableLookup.getFunnel(componentId);
- }
-
- if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.INPUT_PORT.name())) {
- return authorizableLookup.getInputPort(componentId);
- }
-
- if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.OUTPUT_PORT.name())) {
- return authorizableLookup.getOutputPort(componentId);
- }
-
- if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.LABEL.name())) {
- return authorizableLookup.getLabel(componentId);
- }
-
- if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.PROCESS_GROUP.name())) {
- return authorizableLookup.getProcessGroup(componentId).getAuthorizable();
- }
-
- if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.PROCESSOR.name())) {
- return authorizableLookup.getProcessor(componentId).getAuthorizable();
- }
-
- if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_INPUT_PORT.name())) {
- return authorizableLookup.getRemoteProcessGroup(versionedComponent.getInstanceGroupId());
- }
-
- if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_OUTPUT_PORT.name())) {
- return authorizableLookup.getRemoteProcessGroup(versionedComponent.getInstanceGroupId());
- }
-
- if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_PROCESS_GROUP.name())) {
- return authorizableLookup.getRemoteProcessGroup(componentId);
- }
-
- return null;
- }
-
- @Override
- public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo, final boolean fetchRemoteFlows) {
- final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryId());
- if (flowRegistry == null) {
- throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + versionControlInfo.getRegistryId());
- }
-
- final VersionedFlowSnapshot snapshot;
- try {
- snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion(), fetchRemoteFlows, NiFiUserUtils.getNiFiUser());
- } catch (final NiFiRegistryException | IOException e) {
- logger.error(e.getMessage(), e);
- throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket "
- + versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion());
- }
-
- return snapshot;
- }
-
- @Override
- public String getFlowRegistryName(final String flowRegistryId) {
- final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(flowRegistryId);
- return flowRegistry == null ? flowRegistryId : flowRegistry.getName();
- }
-
- private List<Revision> getComponentRevisions(final ProcessGroup processGroup, final boolean includeGroupRevision) {
- final List<Revision> revisions = new ArrayList<>();
- if (includeGroupRevision) {
- revisions.add(revisionManager.getRevision(processGroup.getIdentifier()));
- }
-
- processGroup.findAllConnections().stream()
- .map(component -> revisionManager.getRevision(component.getIdentifier()))
- .forEach(revisions::add);
- processGroup.findAllControllerServices().stream()
- .map(component -> revisionManager.getRevision(component.getIdentifier()))
- .forEach(revisions::add);
- processGroup.findAllFunnels().stream()
- .map(component -> revisionManager.getRevision(component.getIdentifier()))
- .forEach(revisions::add);
- processGroup.findAllInputPorts().stream()
- .map(component -> revisionManager.getRevision(component.getIdentifier()))
- .forEach(revisions::add);
- processGroup.findAllOutputPorts().stream()
- .map(component -> revisionManager.getRevision(component.getIdentifier()))
- .forEach(revisions::add);
- processGroup.findAllLabels().stream()
- .map(component -> revisionManager.getRevision(component.getIdentifier()))
- .forEach(revisions::add);
- processGroup.findAllProcessGroups().stream()
- .map(component -> revisionManager.getRevision(component.getIdentifier()))
- .forEach(revisions::add);
- processGroup.findAllProcessors().stream()
- .map(component -> revisionManager.getRevision(component.getIdentifier()))
- .forEach(revisions::add);
- processGroup.findAllRemoteProcessGroups().stream()
- .map(component -> revisionManager.getRevision(component.getIdentifier()))
- .forEach(revisions::add);
- processGroup.findAllRemoteProcessGroups().stream()
- .flatMap(rpg -> Stream.concat(rpg.getInputPorts().stream(), rpg.getOutputPorts().stream()))
- .map(component -> revisionManager.getRevision(component.getIdentifier()))
- .forEach(revisions::add);
-
- return revisions;
- }
-
- @Override
- public ProcessGroupEntity updateProcessGroupContents(final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo,
- final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings, final boolean updateDescendantVersionedFlows) {
-
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
-
- final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
- final List<Revision> revisions = getComponentRevisions(processGroup, false);
- revisions.add(revision);
-
- final RevisionClaim revisionClaim = new StandardRevisionClaim(revisions);
-
- final RevisionUpdate<ProcessGroupDTO> revisionUpdate = revisionManager.updateRevision(revisionClaim, user, new UpdateRevisionTask<ProcessGroupDTO>() {
- @Override
- public RevisionUpdate<ProcessGroupDTO> update() {
- // update the Process Group
- processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows);
-
- // update the revisions
- final Set<Revision> updatedRevisions = revisions.stream()
- .map(rev -> revisionManager.getRevision(rev.getComponentId()).incrementRevision(revision.getClientId()))
- .collect(Collectors.toSet());
-
- // save
- controllerFacade.save();
-
- // gather details for response
- final ProcessGroupDTO dto = dtoFactory.createProcessGroupDto(processGroup);
-
- final Revision updatedRevision = revisionManager.getRevision(groupId).incrementRevision(revision.getClientId());
- final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity());
- return new StandardRevisionUpdate<>(dto, lastModification, updatedRevisions);
- }
- });
-
- final FlowModification lastModification = revisionUpdate.getLastModification();
-
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
- final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(lastModification);
- final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroup.getIdentifier()));
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroup.getIdentifier()));
- final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
- return entityFactory.createProcessGroupEntity(revisionUpdate.getComponent(), updatedRevision, permissions, status, bulletinEntities);
- }
-
- private AuthorizationResult authorizeAction(final Action action) {
- final String sourceId = action.getSourceId();
- final Component type = action.getSourceType();
-
- Authorizable authorizable;
- try {
- switch (type) {
- case Processor:
- authorizable = authorizableLookup.getProcessor(sourceId).getAuthorizable();
- break;
- case ReportingTask:
- authorizable = authorizableLookup.getReportingTask(sourceId).getAuthorizable();
- break;
- case ControllerService:
- authorizable = authorizableLookup.getControllerService(sourceId).getAuthorizable();
- break;
- case Controller:
- authorizable = controllerFacade;
- break;
- case InputPort:
- authorizable = authorizableLookup.getInputPort(sourceId);
- break;
- case OutputPort:
- authorizable = authorizableLookup.getOutputPort(sourceId);
- break;
- case ProcessGroup:
- authorizable = authorizableLookup.getProcessGroup(sourceId).getAuthorizable();
- break;
- case RemoteProcessGroup:
- authorizable = authorizableLookup.getRemoteProcessGroup(sourceId);
- break;
- case Funnel:
- authorizable = authorizableLookup.getFunnel(sourceId);
- break;
- case Connection:
- authorizable = authorizableLookup.getConnection(sourceId).getAuthorizable();
- break;
- case AccessPolicy:
- authorizable = authorizableLookup.getAccessPolicyById(sourceId);
- break;
- case User:
- case UserGroup:
- authorizable = authorizableLookup.getTenant();
- break;
- default:
- throw new WebApplicationException(Response.serverError().entity("An unexpected type of component is the source of this action.").build());
- }
- } catch (final ResourceNotFoundException e) {
- // if the underlying component is gone, use the controller to see if permissions should be allowed
- authorizable = controllerFacade;
- }
-
- // perform the authorization
- return authorizable.checkAuthorization(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
- }
-
- @Override
- public HistoryDTO getActions(final HistoryQueryDTO historyQueryDto) {
- // extract the query criteria
- final HistoryQuery historyQuery = new HistoryQuery();
- historyQuery.setStartDate(historyQueryDto.getStartDate());
- historyQuery.setEndDate(historyQueryDto.getEndDate());
- historyQuery.setSourceId(historyQueryDto.getSourceId());
- historyQuery.setUserIdentity(historyQueryDto.getUserIdentity());
- historyQuery.setOffset(historyQueryDto.getOffset());
- historyQuery.setCount(historyQueryDto.getCount());
- historyQuery.setSortColumn(historyQueryDto.getSortColumn());
- historyQuery.setSortOrder(historyQueryDto.getSortOrder());
-
- // perform the query
- final History history = auditService.getActions(historyQuery);
-
- // only retain authorized actions
- final HistoryDTO historyDto = dtoFactory.createHistoryDto(history);
- if (history.getActions() != null) {
- final List<ActionEntity> actionEntities = new ArrayList<>();
- for (final Action action : history.getActions()) {
- final AuthorizationResult result = authorizeAction(action);
- actionEntities.add(entityFactory.createActionEntity(dtoFactory.createActionDto(action), Result.Approved.equals(result.getResult())));
- }
- historyDto.setActions(actionEntities);
- }
-
- // create the response
- return historyDto;
- }
-
- @Override
- public ActionEntity getAction(final Integer actionId) {
- // get the action
- final Action action = auditService.getAction(actionId);
-
- // ensure the action was found
- if (action == null) {
- throw new ResourceNotFoundException(String.format("Unable to find action with id '%s'.", actionId));
- }
-
- final AuthorizationResult result = authorizeAction(action);
- final boolean authorized = Result.Approved.equals(result.getResult());
- if (!authorized) {
- throw new AccessDeniedException(result.getExplanation());
- }
-
- // return the action
- return entityFactory.createActionEntity(dtoFactory.createActionDto(action), authorized);
- }
-
- @Override
- public ComponentHistoryDTO getComponentHistory(final String componentId) {
- final Map<String, PropertyHistoryDTO> propertyHistoryDtos = new LinkedHashMap<>();
- final Map<String, List<PreviousValue>> propertyHistory = auditService.getPreviousValues(componentId);
-
- for (final Map.Entry<String, List<PreviousValue>> entry : propertyHistory.entrySet()) {
- final List<PreviousValueDTO> previousValueDtos = new ArrayList<>();
-
- for (final PreviousValue previousValue : entry.getValue()) {
- final PreviousValueDTO dto = new PreviousValueDTO();
- dto.setPreviousValue(previousValue.getPreviousValue());
- dto.setTimestamp(previousValue.getTimestamp());
- dto.setUserIdentity(previousValue.getUserIdentity());
- previousValueDtos.add(dto);
- }
-
- if (!previousValueDtos.isEmpty()) {
- final PropertyHistoryDTO propertyHistoryDto = new PropertyHistoryDTO();
- propertyHistoryDto.setPreviousValues(previousValueDtos);
- propertyHistoryDtos.put(entry.getKey(), propertyHistoryDto);
- }
- }
-
- final ComponentHistoryDTO history = new ComponentHistoryDTO();
- history.setComponentId(componentId);
- history.setPropertyHistory(propertyHistoryDtos);
-
- return history;
- }
-
- @Override
- public ProcessorDiagnosticsEntity getProcessorDiagnostics(final String id) {
- final ProcessorNode processor = processorDAO.getProcessor(id);
- final ProcessorStatus processorStatus = controllerFacade.getProcessorStatus(id);
-
- // Generate Processor Diagnostics
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
- final ProcessorDiagnosticsDTO dto = controllerFacade.getProcessorDiagnostics(processor, processorStatus, bulletinRepository, serviceId -> {
- final ControllerServiceNode serviceNode = controllerServiceDAO.getControllerService(serviceId);
- return createControllerServiceEntity(serviceNode, Collections.emptySet());
- });
-
- // Filter anything out of diagnostics that the user is not authorized to see.
- final List<JVMDiagnosticsSnapshotDTO> jvmDiagnosticsSnaphots = new ArrayList<>();
- final JVMDiagnosticsDTO jvmDiagnostics = dto.getJvmDiagnostics();
- jvmDiagnosticsSnaphots.add(jvmDiagnostics.getAggregateSnapshot());
-
- // filter controller-related information
- final boolean canReadController = authorizableLookup.getController().isAuthorized(authorizer, RequestAction.READ, user);
- if (!canReadController) {
- for (final JVMDiagnosticsSnapshotDTO snapshot : jvmDiagnosticsSnaphots) {
- snapshot.setControllerDiagnostics(null);
- }
- }
-
- // filter system diagnostics information
- final boolean canReadSystem = authorizableLookup.getSystem().isAuthorized(authorizer, RequestAction.READ, user);
- if (!canReadSystem) {
- for (final JVMDiagnosticsSnapshotDTO snapshot : jvmDiagnosticsSnaphots) {
- snapshot.setSystemDiagnosticsDto(null);
- }
- }
-
- final boolean canReadFlow = authorizableLookup.getFlow().isAuthorized(authorizer, RequestAction.READ, user);
- if (!canReadFlow) {
- for (final JVMDiagnosticsSnapshotDTO snapshot : jvmDiagnosticsSnaphots) {
- snapshot.setFlowDiagnosticsDto(null);
- }
- }
-
- // filter connections
- final Predicate<ConnectionDiagnosticsDTO> connectionAuthorized = connectionDiagnostics -> {
- final String connectionId = connectionDiagnostics.getConnection().getId();
- return authorizableLookup.getConnection(connectionId).getAuthorizable().isAuthorized(authorizer, RequestAction.READ, user);
- };
-
- // Filter incoming connections by what user is authorized to READ
- final Set<ConnectionDiagnosticsDTO> incoming = dto.getIncomingConnections();
- final Set<ConnectionDiagnosticsDTO> filteredIncoming = incoming.stream()
- .filter(connectionAuthorized)
- .collect(Collectors.toSet());
-
- dto.setIncomingConnections(filteredIncoming);
-
- // Filter outgoing connections by what user is authorized to READ
- final Set<ConnectionDiagnosticsDTO> outgoing = dto.getOutgoingConnections();
- final Set<ConnectionDiagnosticsDTO> filteredOutgoing = outgoing.stream()
- .filter(connectionAuthorized)
- .collect(Collectors.toSet());
- dto.setOutgoingConnections(filteredOutgoing);
-
- // Filter out any controller services that are referenced by the Processor
- final Set<ControllerServiceDiagnosticsDTO> referencedServices = dto.getReferencedControllerServices();
- final Set<ControllerServiceDiagnosticsDTO> filteredReferencedServices = referencedServices.stream()
- .filter(csDiagnostics -> {
- final String csId = csDiagnostics.getControllerService().getId();
- return authorizableLookup.getControllerService(csId).getAuthorizable().isAuthorized(authorizer, RequestAction.READ, user);
- })
- .map(csDiagnostics -> {
- // Filter out any referencing components because those are generally not relevant from this context.
- final ControllerServiceDTO serviceDto = csDiagnostics.getControllerService().getComponent();
- if (serviceDto != null) {
- serviceDto.setReferencingComponents(null);
- }
- return csDiagnostics;
- })
- .collect(Collectors.toSet());
- dto.setReferencedControllerServices(filteredReferencedServices);
-
- final Revision revision = revisionManager.getRevision(id);
- final RevisionDTO revisionDto = dtoFactory.createRevisionDTO(revision);
- final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(processor);
- final List<BulletinEntity> bulletins = bulletinRepository.findBulletinsForSource(id).stream()
- .map(bulletin -> dtoFactory.createBulletinDto(bulletin))
- .map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissionsDto.getCanRead()))
- .collect(Collectors.toList());
-
- final ProcessorStatusDTO processorStatusDto = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier()));
- return entityFactory.createProcessorDiagnosticsEntity(dto, revisionDto, permissionsDto, processorStatusDto, bulletins);
- }
-
- @Override
- public boolean isClustered() {
- return controllerFacade.isClustered();
- }
-
- @Override
- public String getNodeId() {
- final NodeIdentifier nodeId = controllerFacade.getNodeId();
- if (nodeId != null) {
- return nodeId.getId();
- } else {
- return null;
- }
- }
-
- @Override
- public ClusterDTO getCluster() {
- // create cluster summary dto
- final ClusterDTO clusterDto = new ClusterDTO();
-
- // set current time
- clusterDto.setGenerated(new Date());
-
- // create node dtos
- final List<NodeDTO> nodeDtos = clusterCoordinator.getNodeIdentifiers().stream()
- .map(nodeId -> getNode(nodeId))
- .collect(Collectors.toList());
- clusterDto.setNodes(nodeDtos);
-
- return clusterDto;
- }
-
- @Override
- public NodeDTO getNode(final String nodeId) {
- final NodeIdentifier nodeIdentifier = clusterCoordinator.getNodeIdentifier(nodeId);
- return getNode(nodeIdentifier);
- }
-
- private NodeDTO getNode(final NodeIdentifier nodeId) {
- final NodeConnectionStatus nodeStatus = clusterCoordinator.getConnectionStatus(nodeId);
- final List<NodeEvent> events = clusterCoordinator.getNodeEvents(nodeId);
- final Set<String> roles = getRoles(nodeId);
- final NodeHeartbeat heartbeat = heartbeatMonitor.getLatestHeartbeat(nodeId);
- return dtoFactory.createNodeDTO(nodeId, nodeStatus, heartbeat, events, roles);
- }
-
- private Set<String> getRoles(final NodeIdentifier nodeId) {
- final Set<String> roles = new HashSet<>();
- final String nodeAddress = nodeId.getSocketAddress() + ":" + nodeId.getSocketPort();
-
- for (final String roleName : ClusterRoles.getAllRoles()) {
- final String leader = leaderElectionManager.getLeader(roleName);
- if (leader == null) {
- continue;
- }
-
- if (leader.equals(nodeAddress)) {
- roles.add(roleName);
- }
- }
-
- return roles;
- }
-
- @Override
- public void deleteNode(final String nodeId) {
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
- if (user == null) {
- throw new WebApplicationException(new Throwable("Unable to access details for current user."));
- }
-
- final String userDn = user.getIdentity();
- final NodeIdentifier nodeIdentifier = clusterCoordinator.getNodeIdentifier(nodeId);
- if (nodeIdentifier == null) {
- throw new UnknownNodeException("Cannot remove Node with ID " + nodeId + " because it is not part of the cluster");
- }
-
- final NodeConnectionStatus nodeConnectionStatus = clusterCoordinator.getConnectionStatus(nodeIdentifier);
- if (!nodeConnectionStatus.getState().equals(NodeConnectionState.OFFLOADED) && !nodeConnectionStatus.getState().equals(NodeConnectionState.DISCONNECTED)) {
- throw new IllegalNodeDeletionException("Cannot remove Node with ID " + nodeId +
- " because it is not disconnected or offloaded, current state = " + nodeConnectionStatus.getState());
- }
-
- clusterCoordinator.removeNode(nodeIdentifier, userDn);
- heartbeatMonitor.removeHeartbeat(nodeIdentifier);
- }
-
- /* reusable function declarations for converting ids to tenant entities */
- private Function<String, TenantEntity> mapUserGroupIdToTenantEntity(final boolean enforceGroupExistence) {
- return userGroupId -> {
- final RevisionDTO userGroupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userGroupId));
-
- final Group group;
- if (enforceGroupExistence || userGroupDAO.hasUserGroup(userGroupId)) {
- group = userGroupDAO.getUserGroup(userGroupId);
- } else {
- group = new Group.Builder().identifier(userGroupId).name("Group ID - " + userGroupId + " (removed externally)").build();
- }
-
- return entityFactory.createTenantEntity(dtoFactory.createTenantDTO(group), userGroupRevision,
- dtoFactory.createPermissionsDto(authorizableLookup.getTenant()));
- };
- }
-
- private Function<String, TenantEntity> mapUserIdToTenantEntity(final boolean enforceUserExistence) {
- return userId -> {
- final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userId));
-
- final User user;
- if (enforceUserExistence || userDAO.hasUser(userId)) {
- user = userDAO.getUser(userId);
- } else {
- user = new User.Builder().identifier(userId).identity("User ID - " + userId + " (removed externally)").build();
- }
-
- return entityFactory.createTenantEntity(dtoFactory.createTenantDTO(user), userRevision,
- dtoFactory.createPermissionsDto(authorizableLookup.getTenant()));
- };
- }
-
-
- /* setters */
- public void setProperties(final NiFiProperties properties) {
- this.properties = properties;
- }
-
- public void setControllerFacade(final ControllerFacade controllerFacade) {
- this.controllerFacade = controllerFacade;
- }
-
- public void setRemoteProcessGroupDAO(final RemoteProcessGroupDAO remoteProcessGroupDAO) {
- this.remoteProcessGroupDAO = remoteProcessGroupDAO;
- }
-
- public void setLabelDAO(final LabelDAO labelDAO) {
- this.labelDAO = labelDAO;
- }
-
- public void setFunnelDAO(final FunnelDAO funnelDAO) {
- this.funnelDAO = funnelDAO;
- }
-
- public void setSnippetDAO(final SnippetDAO snippetDAO) {
- this.snippetDAO = snippetDAO;
- }
-
- public void setProcessorDAO(final ProcessorDAO processorDAO) {
- this.processorDAO = processorDAO;
- }
-
- public void setConnectionDAO(final ConnectionDAO connectionDAO) {
- this.connectionDAO = connectionDAO;
- }
-
- public void setAuditService(final AuditService auditService) {
- this.auditService = auditService;
- }
-
- public void setRevisionManager(final RevisionManager revisionManager) {
- this.revisionManager = revisionManager;
- }
-
- public void setDtoFactory(final DtoFactory dtoFactory) {
- this.dtoFactory = dtoFactory;
- }
-
- public void setEntityFactory(final EntityFactory entityFactory) {
- this.entityFactory = entityFactory;
- }
-
- public void setInputPortDAO(final PortDAO inputPortDAO) {
- this.inputPortDAO = inputPortDAO;
- }
-
- public void setOutputPortDAO(final PortDAO outputPortDAO) {
- this.outputPortDAO = outputPortDAO;
- }
-
- public void setProcessGroupDAO(final ProcessGroupDAO processGroupDAO) {
- this.processGroupDAO = processGroupDAO;
- }
-
- public void setControllerServiceDAO(final ControllerServiceDAO controllerServiceDAO) {
- this.controllerServiceDAO = controllerServiceDAO;
- }
-
- public void setReportingTaskDAO(final ReportingTaskDAO reportingTaskDAO) {
- this.reportingTaskDAO = reportingTaskDAO;
- }
-
- public void setTemplateDAO(final TemplateDAO templateDAO) {
- this.templateDAO = templateDAO;
- }
-
- public void setSnippetUtils(final SnippetUtils snippetUtils) {
- this.snippetUtils = snippetUtils;
- }
-
- public void setAuthorizableLookup(final AuthorizableLookup authorizableLookup) {
- this.authorizableLookup = authorizableLookup;
- }
-
- public void setAuthorizer(final Authorizer authorizer) {
- this.authorizer = authorizer;
- }
-
- public void setUserDAO(final UserDAO userDAO) {
- this.userDAO = userDAO;
- }
-
- public void setUserGroupDAO(final UserGroupDAO userGroupDAO) {
- this.userGroupDAO = userGroupDAO;
- }
-
- public void setAccessPolicyDAO(final AccessPolicyDAO accessPolicyDAO) {
- this.accessPolicyDAO = accessPolicyDAO;
- }
-
- public void setClusterCoordinator(final ClusterCoordinator coordinator) {
- this.clusterCoordinator = coordinator;
- }
-
- public void setHeartbeatMonitor(final HeartbeatMonitor heartbeatMonitor) {
- this.heartbeatMonitor = heartbeatMonitor;
- }
-
- public void setBulletinRepository(final BulletinRepository bulletinRepository) {
- this.bulletinRepository = bulletinRepository;
- }
-
- public void setLeaderElectionManager(final LeaderElectionManager leaderElectionManager) {
- this.leaderElectionManager = leaderElectionManager;
- }
-
- public void setRegistryDAO(RegistryDAO registryDao) {
- this.registryDAO = registryDao;
- }
-
- public void setFlowRegistryClient(FlowRegistryClient flowRegistryClient) {
- this.flowRegistryClient = flowRegistryClient;
- }
-}