/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * Modifications to the original nifi code for the ONAP project are made * available under the Apache License, Version 2.0 */ package org.apache.nifi.web; import com.google.common.collect.Sets; import org.apache.commons.collections4.CollectionUtils; import org.apache.nifi.action.Action; import org.apache.nifi.action.Component; import org.apache.nifi.action.FlowChangeAction; import org.apache.nifi.action.Operation; import org.apache.nifi.action.details.FlowChangePurgeDetails; import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.authorization.AccessDeniedException; import org.apache.nifi.authorization.AccessPolicy; import org.apache.nifi.authorization.AuthorizableLookup; import org.apache.nifi.authorization.AuthorizationRequest; import org.apache.nifi.authorization.AuthorizationResult; import org.apache.nifi.authorization.AuthorizationResult.Result; import org.apache.nifi.authorization.AuthorizeAccess; import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.authorization.Group; import org.apache.nifi.authorization.RequestAction; import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.User; import org.apache.nifi.authorization.UserContextKeys; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.EnforcePolicyPermissionsThroughBaseResource; import org.apache.nifi.authorization.resource.OperationAuthorizable; import org.apache.nifi.authorization.resource.ResourceFactory; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor; import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat; import org.apache.nifi.cluster.coordination.node.ClusterRoles; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.coordination.node.OffloadCode; import org.apache.nifi.cluster.event.NodeEvent; import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException; import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.RequiredPermission; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.Counter; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.Snippet; import org.apache.nifi.controller.Template; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.leader.election.LeaderElectionManager; import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceReference; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.diagnostics.SystemDiagnostics; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.history.History; import org.apache.nifi.history.HistoryQuery; import org.apache.nifi.history.PreviousValue; import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.authorization.Permissions; import org.apache.nifi.registry.bucket.Bucket; import org.apache.nifi.registry.client.NiFiRegistryException; import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedComponent; import org.apache.nifi.registry.flow.VersionedConnection; import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlowCoordinates; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; import org.apache.nifi.registry.flow.VersionedFlowState; import org.apache.nifi.registry.flow.VersionedProcessGroup; import org.apache.nifi.registry.flow.diff.ComparableDataFlow; import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor; import org.apache.nifi.registry.flow.diff.DifferenceType; import org.apache.nifi.registry.flow.diff.FlowComparator; import org.apache.nifi.registry.flow.diff.FlowComparison; import org.apache.nifi.registry.flow.diff.FlowDifference; import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow; import org.apache.nifi.registry.flow.diff.StandardFlowComparator; import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPort; import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinQuery; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.ComponentType; import org.apache.nifi.util.BundleUtils; import org.apache.nifi.util.FlowDifferenceFilters; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.api.dto.AccessPolicyDTO; import org.apache.nifi.web.api.dto.AccessPolicySummaryDTO; import org.apache.nifi.web.api.dto.AffectedComponentDTO; import org.apache.nifi.web.api.dto.BucketDTO; import org.apache.nifi.web.api.dto.BulletinBoardDTO; import org.apache.nifi.web.api.dto.BulletinDTO; import org.apache.nifi.web.api.dto.BulletinQueryDTO; import org.apache.nifi.web.api.dto.BundleDTO; import org.apache.nifi.web.api.dto.ClusterDTO; import org.apache.nifi.web.api.dto.ComponentDTO; import org.apache.nifi.web.api.dto.ComponentDifferenceDTO; import org.apache.nifi.web.api.dto.ComponentHistoryDTO; import org.apache.nifi.web.api.dto.ComponentReferenceDTO; import org.apache.nifi.web.api.dto.ComponentRestrictionPermissionDTO; import org.apache.nifi.web.api.dto.ComponentStateDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; import org.apache.nifi.web.api.dto.ControllerConfigurationDTO; import org.apache.nifi.web.api.dto.ControllerDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; import org.apache.nifi.web.api.dto.CounterDTO; import org.apache.nifi.web.api.dto.CountersDTO; import org.apache.nifi.web.api.dto.CountersSnapshotDTO; import org.apache.nifi.web.api.dto.DocumentedTypeDTO; import org.apache.nifi.web.api.dto.DropRequestDTO; import org.apache.nifi.web.api.dto.DtoFactory; import org.apache.nifi.web.api.dto.EntityFactory; import org.apache.nifi.web.api.dto.FlowConfigurationDTO; import org.apache.nifi.web.api.dto.FlowFileDTO; import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.FunnelDTO; import org.apache.nifi.web.api.dto.LabelDTO; import org.apache.nifi.web.api.dto.ListingRequestDTO; import org.apache.nifi.web.api.dto.NodeDTO; import org.apache.nifi.web.api.dto.PermissionsDTO; import org.apache.nifi.web.api.dto.PortDTO; import org.apache.nifi.web.api.dto.PreviousValueDTO; import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.ProcessorConfigDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.dto.PropertyDescriptorDTO; import org.apache.nifi.web.api.dto.PropertyHistoryDTO; import org.apache.nifi.web.api.dto.RegistryDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; import org.apache.nifi.web.api.dto.ReportingTaskDTO; import org.apache.nifi.web.api.dto.RequiredPermissionDTO; import org.apache.nifi.web.api.dto.ResourceDTO; import org.apache.nifi.web.api.dto.RevisionDTO; import org.apache.nifi.web.api.dto.SnippetDTO; import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO; import org.apache.nifi.web.api.dto.TemplateDTO; import org.apache.nifi.web.api.dto.UserDTO; import org.apache.nifi.web.api.dto.UserGroupDTO; import org.apache.nifi.web.api.dto.VariableRegistryDTO; import org.apache.nifi.web.api.dto.VersionControlInformationDTO; import org.apache.nifi.web.api.dto.VersionedFlowDTO; import org.apache.nifi.web.api.dto.action.HistoryDTO; import org.apache.nifi.web.api.dto.action.HistoryQueryDTO; import org.apache.nifi.web.api.dto.diagnostics.ConnectionDiagnosticsDTO; import org.apache.nifi.web.api.dto.diagnostics.ControllerServiceDiagnosticsDTO; import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsDTO; import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsSnapshotDTO; import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO; import org.apache.nifi.web.api.dto.flow.FlowDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO; import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO; import org.apache.nifi.web.api.dto.search.SearchResultsDTO; import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.PortStatusDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.nifi.web.api.entity.AccessPolicyEntity; import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity; import org.apache.nifi.web.api.entity.ActionEntity; import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity; import org.apache.nifi.web.api.entity.AffectedComponentEntity; import org.apache.nifi.web.api.entity.BucketEntity; import org.apache.nifi.web.api.entity.BulletinEntity; import org.apache.nifi.web.api.entity.ComponentReferenceEntity; import org.apache.nifi.web.api.entity.ConnectionEntity; import org.apache.nifi.web.api.entity.ConnectionStatusEntity; import org.apache.nifi.web.api.entity.ControllerBulletinsEntity; import org.apache.nifi.web.api.entity.ControllerConfigurationEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity; import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; import org.apache.nifi.web.api.entity.CurrentUserEntity; import org.apache.nifi.web.api.entity.FlowComparisonEntity; import org.apache.nifi.web.api.entity.FlowConfigurationEntity; import org.apache.nifi.web.api.entity.FlowEntity; import org.apache.nifi.web.api.entity.FunnelEntity; import org.apache.nifi.web.api.entity.LabelEntity; import org.apache.nifi.web.api.entity.PortEntity; import org.apache.nifi.web.api.entity.PortStatusEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity; import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity; import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.ProcessorStatusEntity; import org.apache.nifi.web.api.entity.RegistryClientEntity; import org.apache.nifi.web.api.entity.RegistryEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity; import org.apache.nifi.web.api.entity.ReportingTaskEntity; import org.apache.nifi.web.api.entity.ScheduleComponentsEntity; import org.apache.nifi.web.api.entity.SnippetEntity; import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity; import org.apache.nifi.web.api.entity.StatusHistoryEntity; import org.apache.nifi.web.api.entity.TemplateEntity; import org.apache.nifi.web.api.entity.TenantEntity; import org.apache.nifi.web.api.entity.UserEntity; import org.apache.nifi.web.api.entity.UserGroupEntity; import org.apache.nifi.web.api.entity.VariableEntity; import org.apache.nifi.web.api.entity.VariableRegistryEntity; import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity; import org.apache.nifi.web.api.entity.VersionControlInformationEntity; import org.apache.nifi.web.api.entity.VersionedFlowEntity; import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity; import org.apache.nifi.web.controller.ControllerFacade; import org.apache.nifi.web.dao.AccessPolicyDAO; import org.apache.nifi.web.dao.ConnectionDAO; import org.apache.nifi.web.dao.ControllerServiceDAO; import org.apache.nifi.web.dao.FunnelDAO; import org.apache.nifi.web.dao.LabelDAO; import org.apache.nifi.web.dao.PortDAO; import org.apache.nifi.web.dao.ProcessGroupDAO; import org.apache.nifi.web.dao.ProcessorDAO; import org.apache.nifi.web.dao.RegistryDAO; import org.apache.nifi.web.dao.RemoteProcessGroupDAO; import org.apache.nifi.web.dao.ReportingTaskDAO; import org.apache.nifi.web.dao.SnippetDAO; import org.apache.nifi.web.dao.TemplateDAO; import org.apache.nifi.web.dao.UserDAO; import org.apache.nifi.web.dao.UserGroupDAO; import org.apache.nifi.web.revision.DeleteRevisionTask; import org.apache.nifi.web.revision.ExpiredRevisionClaimException; import org.apache.nifi.web.revision.RevisionClaim; import org.apache.nifi.web.revision.RevisionManager; import org.apache.nifi.web.revision.RevisionUpdate; import org.apache.nifi.web.revision.StandardRevisionClaim; import org.apache.nifi.web.revision.StandardRevisionUpdate; import org.apache.nifi.web.revision.UpdateRevisionTask; import org.apache.nifi.web.util.SnippetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.ListIterator; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; /** * Implementation of NiFiServiceFacade that performs revision checking. */ public class StandardNiFiServiceFacade implements NiFiServiceFacade { private static final Logger logger = LoggerFactory.getLogger(StandardNiFiServiceFacade.class); private static final int VALIDATION_WAIT_MILLIS = 50; // nifi core components private ControllerFacade controllerFacade; private SnippetUtils snippetUtils; // revision manager private RevisionManager revisionManager; private BulletinRepository bulletinRepository; // data access objects private ProcessorDAO processorDAO; private ProcessGroupDAO processGroupDAO; private RemoteProcessGroupDAO remoteProcessGroupDAO; private LabelDAO labelDAO; private FunnelDAO funnelDAO; private SnippetDAO snippetDAO; private PortDAO inputPortDAO; private PortDAO outputPortDAO; private ConnectionDAO connectionDAO; private ControllerServiceDAO controllerServiceDAO; private ReportingTaskDAO reportingTaskDAO; private TemplateDAO templateDAO; private UserDAO userDAO; private UserGroupDAO userGroupDAO; private AccessPolicyDAO accessPolicyDAO; private RegistryDAO registryDAO; private ClusterCoordinator clusterCoordinator; private HeartbeatMonitor heartbeatMonitor; private LeaderElectionManager leaderElectionManager; // administrative services private AuditService auditService; // flow registry private FlowRegistryClient flowRegistryClient; // properties private NiFiProperties properties; private DtoFactory dtoFactory; private EntityFactory entityFactory; private Authorizer authorizer; private AuthorizableLookup authorizableLookup; // ----------------------------------------- // Synchronization methods // ----------------------------------------- @Override public void authorizeAccess(final AuthorizeAccess authorizeAccess) { authorizeAccess.authorize(authorizableLookup); } @Override public void verifyRevision(final Revision revision, final NiFiUser user) { final Revision curRevision = revisionManager.getRevision(revision.getComponentId()); if (revision.equals(curRevision)) { return; } throw new InvalidRevisionException(revision + " is not the most up-to-date revision. This component appears to have been modified"); } @Override public void verifyRevisions(final Set revisions, final NiFiUser user) { for (final Revision revision : revisions) { verifyRevision(revision, user); } } @Override public Set getRevisionsFromGroup(final String groupId, final Function> getComponents) { final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); final Set componentIds = getComponents.apply(group); return componentIds.stream().map(id -> revisionManager.getRevision(id)).collect(Collectors.toSet()); } @Override public Set getRevisionsFromSnippet(final String snippetId) { final Snippet snippet = snippetDAO.getSnippet(snippetId); final Set componentIds = new HashSet<>(); componentIds.addAll(snippet.getProcessors().keySet()); componentIds.addAll(snippet.getFunnels().keySet()); componentIds.addAll(snippet.getLabels().keySet()); componentIds.addAll(snippet.getConnections().keySet()); componentIds.addAll(snippet.getInputPorts().keySet()); componentIds.addAll(snippet.getOutputPorts().keySet()); componentIds.addAll(snippet.getProcessGroups().keySet()); componentIds.addAll(snippet.getRemoteProcessGroups().keySet()); return componentIds.stream().map(id -> revisionManager.getRevision(id)).collect(Collectors.toSet()); } // ----------------------------------------- // Verification Operations // ----------------------------------------- @Override public void verifyListQueue(final String connectionId) { connectionDAO.verifyList(connectionId); } @Override public void verifyCreateConnection(final String groupId, final ConnectionDTO connectionDTO) { connectionDAO.verifyCreate(groupId, connectionDTO); } @Override public void verifyUpdateConnection(final ConnectionDTO connectionDTO) { // if connection does not exist, then the update request is likely creating it // so we don't verify since it will fail if (connectionDAO.hasConnection(connectionDTO.getId())) { connectionDAO.verifyUpdate(connectionDTO); } else { connectionDAO.verifyCreate(connectionDTO.getParentGroupId(), connectionDTO); } } @Override public void verifyDeleteConnection(final String connectionId) { connectionDAO.verifyDelete(connectionId); } @Override public void verifyDeleteFunnel(final String funnelId) { funnelDAO.verifyDelete(funnelId); } @Override public void verifyUpdateInputPort(final PortDTO inputPortDTO) { // if connection does not exist, then the update request is likely creating it // so we don't verify since it will fail if (inputPortDAO.hasPort(inputPortDTO.getId())) { inputPortDAO.verifyUpdate(inputPortDTO); } } @Override public void verifyDeleteInputPort(final String inputPortId) { inputPortDAO.verifyDelete(inputPortId); } @Override public void verifyUpdateOutputPort(final PortDTO outputPortDTO) { // if connection does not exist, then the update request is likely creating it // so we don't verify since it will fail if (outputPortDAO.hasPort(outputPortDTO.getId())) { outputPortDAO.verifyUpdate(outputPortDTO); } } @Override public void verifyDeleteOutputPort(final String outputPortId) { outputPortDAO.verifyDelete(outputPortId); } @Override public void verifyCreateProcessor(ProcessorDTO processorDTO) { processorDAO.verifyCreate(processorDTO); } @Override public void verifyUpdateProcessor(final ProcessorDTO processorDTO) { // if group does not exist, then the update request is likely creating it // so we don't verify since it will fail if (processorDAO.hasProcessor(processorDTO.getId())) { processorDAO.verifyUpdate(processorDTO); } else { verifyCreateProcessor(processorDTO); } } @Override public void verifyDeleteProcessor(final String processorId) { processorDAO.verifyDelete(processorId); } @Override public void verifyScheduleComponents(final String groupId, final ScheduledState state, final Set componentIds) { processGroupDAO.verifyScheduleComponents(groupId, state, componentIds); } @Override public void verifyEnableComponents(String processGroupId, ScheduledState state, Set componentIds) { processGroupDAO.verifyEnableComponents(processGroupId, state, componentIds); } @Override public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Collection serviceIds) { processGroupDAO.verifyActivateControllerServices(state, serviceIds); } @Override public void verifyDeleteProcessGroup(final String groupId) { processGroupDAO.verifyDelete(groupId); } @Override public void verifyUpdateRemoteProcessGroup(final RemoteProcessGroupDTO remoteProcessGroupDTO) { // if remote group does not exist, then the update request is likely creating it // so we don't verify since it will fail if (remoteProcessGroupDAO.hasRemoteProcessGroup(remoteProcessGroupDTO.getId())) { remoteProcessGroupDAO.verifyUpdate(remoteProcessGroupDTO); } } @Override public void verifyUpdateRemoteProcessGroupInputPort(final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { remoteProcessGroupDAO.verifyUpdateInputPort(remoteProcessGroupId, remoteProcessGroupPortDTO); } @Override public void verifyUpdateRemoteProcessGroupOutputPort(final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { remoteProcessGroupDAO.verifyUpdateOutputPort(remoteProcessGroupId, remoteProcessGroupPortDTO); } @Override public void verifyDeleteRemoteProcessGroup(final String remoteProcessGroupId) { remoteProcessGroupDAO.verifyDelete(remoteProcessGroupId); } @Override public void verifyCreateControllerService(ControllerServiceDTO controllerServiceDTO) { controllerServiceDAO.verifyCreate(controllerServiceDTO); } @Override public void verifyUpdateControllerService(final ControllerServiceDTO controllerServiceDTO) { // if service does not exist, then the update request is likely creating it // so we don't verify since it will fail if (controllerServiceDAO.hasControllerService(controllerServiceDTO.getId())) { controllerServiceDAO.verifyUpdate(controllerServiceDTO); } else { verifyCreateControllerService(controllerServiceDTO); } } @Override public void verifyUpdateControllerServiceReferencingComponents(final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) { controllerServiceDAO.verifyUpdateReferencingComponents(controllerServiceId, scheduledState, controllerServiceState); } @Override public void verifyDeleteControllerService(final String controllerServiceId) { controllerServiceDAO.verifyDelete(controllerServiceId); } @Override public void verifyCreateReportingTask(ReportingTaskDTO reportingTaskDTO) { reportingTaskDAO.verifyCreate(reportingTaskDTO); } @Override public void verifyUpdateReportingTask(final ReportingTaskDTO reportingTaskDTO) { // if tasks does not exist, then the update request is likely creating it // so we don't verify since it will fail if (reportingTaskDAO.hasReportingTask(reportingTaskDTO.getId())) { reportingTaskDAO.verifyUpdate(reportingTaskDTO); } else { verifyCreateReportingTask(reportingTaskDTO); } } @Override public void verifyDeleteReportingTask(final String reportingTaskId) { reportingTaskDAO.verifyDelete(reportingTaskId); } // ----------------------------------------- // Write Operations // ----------------------------------------- @Override public AccessPolicyEntity updateAccessPolicy(final Revision revision, final AccessPolicyDTO accessPolicyDTO) { final Authorizable authorizable = authorizableLookup.getAccessPolicyById(accessPolicyDTO.getId()); final RevisionUpdate snapshot = updateComponent(revision, authorizable, () -> accessPolicyDAO.updateAccessPolicy(accessPolicyDTO), accessPolicy -> { final Set users = accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()); final Set userGroups = accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()); final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource()); return dtoFactory.createAccessPolicyDto(accessPolicy, userGroups, users, componentReference); }); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizable); return entityFactory.createAccessPolicyEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); } @Override public UserEntity updateUser(final Revision revision, final UserDTO userDTO) { final Authorizable usersAuthorizable = authorizableLookup.getTenant(); final Set groups = userGroupDAO.getUserGroupsForUser(userDTO.getId()); final Set policies = userGroupDAO.getAccessPoliciesForUser(userDTO.getId()); final RevisionUpdate snapshot = updateComponent(revision, usersAuthorizable, () -> userDAO.updateUser(userDTO), user -> { final Set tenantEntities = groups.stream().map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()); final Set policyEntities = policies.stream().map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); return dtoFactory.createUserDto(user, tenantEntities, policyEntities); }); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(usersAuthorizable); return entityFactory.createUserEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); } @Override public UserGroupEntity updateUserGroup(final Revision revision, final UserGroupDTO userGroupDTO) { final Authorizable userGroupsAuthorizable = authorizableLookup.getTenant(); final Set policies = userGroupDAO.getAccessPoliciesForUserGroup(userGroupDTO.getId()); final RevisionUpdate snapshot = updateComponent(revision, userGroupsAuthorizable, () -> userGroupDAO.updateUserGroup(userGroupDTO), userGroup -> { final Set tenantEntities = userGroup.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()); final Set policyEntities = policies.stream().map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); return dtoFactory.createUserGroupDto(userGroup, tenantEntities, policyEntities); } ); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(userGroupsAuthorizable); return entityFactory.createUserGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); } @Override public ConnectionEntity updateConnection(final Revision revision, final ConnectionDTO connectionDTO) { final Connection connectionNode = connectionDAO.getConnection(connectionDTO.getId()); final RevisionUpdate snapshot = updateComponent( revision, connectionNode, () -> connectionDAO.updateConnection(connectionDTO), connection -> dtoFactory.createConnectionDto(connection)); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connectionNode); final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionNode.getIdentifier())); return entityFactory.createConnectionEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status); } @Override public ProcessorEntity updateProcessor(final Revision revision, final ProcessorDTO processorDTO) { // get the component, ensure we have access to it, and perform the update request final ProcessorNode processorNode = processorDAO.getProcessor(processorDTO.getId()); final RevisionUpdate snapshot = updateComponent(revision, processorNode, () -> processorDAO.updateProcessor(processorDTO), proc -> { awaitValidationCompletion(proc); return dtoFactory.createProcessorDto(proc); }); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processorNode); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processorNode)); final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processorNode.getIdentifier())); final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processorNode.getIdentifier())); final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); return entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); } private void awaitValidationCompletion(final ComponentNode component) { component.getValidationStatus(VALIDATION_WAIT_MILLIS, TimeUnit.MILLISECONDS); } @Override public LabelEntity updateLabel(final Revision revision, final LabelDTO labelDTO) { final Label labelNode = labelDAO.getLabel(labelDTO.getId()); final RevisionUpdate snapshot = updateComponent(revision, labelNode, () -> labelDAO.updateLabel(labelDTO), label -> dtoFactory.createLabelDto(label)); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(labelNode); return entityFactory.createLabelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); } @Override public FunnelEntity updateFunnel(final Revision revision, final FunnelDTO funnelDTO) { final Funnel funnelNode = funnelDAO.getFunnel(funnelDTO.getId()); final RevisionUpdate snapshot = updateComponent(revision, funnelNode, () -> funnelDAO.updateFunnel(funnelDTO), funnel -> dtoFactory.createFunnelDto(funnel)); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnelNode); return entityFactory.createFunnelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); } /** * Updates a component with the given revision, using the provided supplier to call * into the appropriate DAO and the provided function to convert the component into a DTO. * * @param revision the current revision * @param daoUpdate a Supplier that will update the component via the appropriate DAO * @param dtoCreation a Function to convert a component into a dao * @param the DTO Type of the updated component * @param the Component Type of the updated component * @return A RevisionUpdate that represents the new configuration */ private RevisionUpdate updateComponent(final Revision revision, final Authorizable authorizable, final Supplier daoUpdate, final Function dtoCreation) { try { final NiFiUser user = NiFiUserUtils.getNiFiUser(); final RevisionUpdate updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(revision), user, new UpdateRevisionTask() { @Override public RevisionUpdate update() { // get the updated component final C component = daoUpdate.get(); // save updated controller controllerFacade.save(); final D dto = dtoCreation.apply(component); final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId()); final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity()); return new StandardRevisionUpdate<>(dto, lastModification); } }); return updatedComponent; } catch (final ExpiredRevisionClaimException erce) { throw new InvalidRevisionException("Failed to update component " + authorizable, erce); } } @Override public void verifyUpdateSnippet(final SnippetDTO snippetDto, final Set affectedComponentIds) { // if snippet does not exist, then the update request is likely creating it // so we don't verify since it will fail if (snippetDAO.hasSnippet(snippetDto.getId())) { snippetDAO.verifyUpdateSnippetComponent(snippetDto); } } @Override public SnippetEntity updateSnippet(final Set revisions, final SnippetDTO snippetDto) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); final RevisionClaim revisionClaim = new StandardRevisionClaim(revisions); final RevisionUpdate snapshot; try { snapshot = revisionManager.updateRevision(revisionClaim, user, new UpdateRevisionTask() { @Override public RevisionUpdate update() { // get the updated component final Snippet snippet = snippetDAO.updateSnippetComponents(snippetDto); // drop the snippet snippetDAO.dropSnippet(snippet.getId()); // save updated controller controllerFacade.save(); // increment the revisions final Set updatedRevisions = revisions.stream().map(revision -> { final Revision currentRevision = revisionManager.getRevision(revision.getComponentId()); return currentRevision.incrementRevision(revision.getClientId()); }).collect(Collectors.toSet()); final SnippetDTO dto = dtoFactory.createSnippetDto(snippet); return new StandardRevisionUpdate<>(dto, null, updatedRevisions); } }); } catch (final ExpiredRevisionClaimException e) { throw new InvalidRevisionException("Failed to update Snippet", e); } return entityFactory.createSnippetEntity(snapshot.getComponent()); } @Override public PortEntity updateInputPort(final Revision revision, final PortDTO inputPortDTO) { final Port inputPortNode = inputPortDAO.getPort(inputPortDTO.getId()); final RevisionUpdate snapshot = updateComponent(revision, inputPortNode, () -> inputPortDAO.updatePort(inputPortDTO), port -> dtoFactory.createPortDto(port)); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(inputPortNode); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(inputPortNode)); final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(inputPortNode.getIdentifier())); final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(inputPortNode.getIdentifier())); final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); } @Override public PortEntity updateOutputPort(final Revision revision, final PortDTO outputPortDTO) { final Port outputPortNode = outputPortDAO.getPort(outputPortDTO.getId()); final RevisionUpdate snapshot = updateComponent(revision, outputPortNode, () -> outputPortDAO.updatePort(outputPortDTO), port -> dtoFactory.createPortDto(port)); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(outputPortNode); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(outputPortNode), NiFiUserUtils.getNiFiUser()); final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(outputPortNode.getIdentifier())); final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(outputPortNode.getIdentifier())); final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); } @Override public RemoteProcessGroupEntity updateRemoteProcessGroup(final Revision revision, final RemoteProcessGroupDTO remoteProcessGroupDTO) { final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId()); final RevisionUpdate snapshot = updateComponent( revision, remoteProcessGroupNode, () -> remoteProcessGroupDAO.updateRemoteProcessGroup(remoteProcessGroupDTO), remoteProcessGroup -> dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup)); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode)); final RevisionDTO updateRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroupNode, controllerFacade.getRemoteProcessGroupStatus(remoteProcessGroupNode.getIdentifier())); final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(remoteProcessGroupNode.getIdentifier())); final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), updateRevision, permissions, operatePermissions, status, bulletinEntities); } @Override public RemoteProcessGroupPortEntity updateRemoteProcessGroupInputPort( final Revision revision, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupPortDTO.getGroupId()); final RevisionUpdate snapshot = updateComponent( revision, remoteProcessGroupNode, () -> remoteProcessGroupDAO.updateRemoteProcessGroupInputPort(remoteProcessGroupId, remoteProcessGroupPortDTO), remoteGroupPort -> dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort)); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode)); final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); return entityFactory.createRemoteProcessGroupPortEntity(snapshot.getComponent(), updatedRevision, permissions, operatePermissions); } @Override public RemoteProcessGroupPortEntity updateRemoteProcessGroupOutputPort( final Revision revision, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupPortDTO.getGroupId()); final RevisionUpdate snapshot = updateComponent( revision, remoteProcessGroupNode, () -> remoteProcessGroupDAO.updateRemoteProcessGroupOutputPort(remoteProcessGroupId, remoteProcessGroupPortDTO), remoteGroupPort -> dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort)); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode)); final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); return entityFactory.createRemoteProcessGroupPortEntity(snapshot.getComponent(), updatedRevision, permissions, operatePermissions); } @Override public Set getActiveComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO variableRegistryDto) { final ProcessGroup group = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId()); if (group == null) { throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistryDto.getProcessGroupId()); } final Map variableMap = new HashMap<>(); variableRegistryDto.getVariables().stream() // have to use forEach here instead of using Collectors.toMap because value may be null .map(VariableEntity::getVariable) .forEach(var -> variableMap.put(var.getName(), var.getValue())); final Set affectedComponentDtos = new HashSet<>(); final Set updatedVariableNames = getUpdatedVariables(group, variableMap); for (final String variableName : updatedVariableNames) { final Set affectedComponents = group.getComponentsAffectedByVariable(variableName); for (final ComponentNode component : affectedComponents) { if (component instanceof ProcessorNode) { final ProcessorNode procNode = (ProcessorNode) component; if (procNode.isRunning()) { affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(procNode)); } } else if (component instanceof ControllerServiceNode) { final ControllerServiceNode serviceNode = (ControllerServiceNode) component; if (serviceNode.isActive()) { affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(serviceNode)); } } else { throw new RuntimeException("Found unexpected type of Component [" + component.getCanonicalClassName() + "] dependending on variable"); } } } return affectedComponentDtos; } @Override public Set getComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO variableRegistryDto) { final ProcessGroup group = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId()); if (group == null) { throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistryDto.getProcessGroupId()); } final Map variableMap = new HashMap<>(); variableRegistryDto.getVariables().stream() // have to use forEach here instead of using Collectors.toMap because value may be null .map(VariableEntity::getVariable) .forEach(var -> variableMap.put(var.getName(), var.getValue())); final Set affectedComponentEntities = new HashSet<>(); final Set updatedVariableNames = getUpdatedVariables(group, variableMap); for (final String variableName : updatedVariableNames) { final Set affectedComponents = group.getComponentsAffectedByVariable(variableName); affectedComponentEntities.addAll(dtoFactory.createAffectedComponentEntities(affectedComponents, revisionManager)); } return affectedComponentEntities; } private Set getUpdatedVariables(final ProcessGroup group, final Map newVariableValues) { final Set updatedVariableNames = new HashSet<>(); final ComponentVariableRegistry registry = group.getVariableRegistry(); for (final Map.Entry entry : newVariableValues.entrySet()) { final String varName = entry.getKey(); final String newValue = entry.getValue(); final String curValue = registry.getVariableValue(varName); if (!Objects.equals(newValue, curValue)) { updatedVariableNames.add(varName); } } return updatedVariableNames; } @Override public VariableRegistryEntity updateVariableRegistry(Revision revision, VariableRegistryDTO variableRegistryDto) { final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId()); final RevisionUpdate snapshot = updateComponent(revision, processGroupNode, () -> processGroupDAO.updateVariableRegistry(variableRegistryDto), processGroup -> dtoFactory.createVariableRegistryDto(processGroup, revisionManager)); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode); final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); return entityFactory.createVariableRegistryEntity(snapshot.getComponent(), updatedRevision, permissions); } @Override public ProcessGroupEntity updateProcessGroup(final Revision revision, final ProcessGroupDTO processGroupDTO) { final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(processGroupDTO.getId()); final RevisionUpdate snapshot = updateComponent(revision, processGroupNode, () -> processGroupDAO.updateProcessGroup(processGroupDTO), processGroup -> dtoFactory.createProcessGroupDto(processGroup)); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode); final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroupNode.getIdentifier())); final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroupNode.getIdentifier())); final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); return entityFactory.createProcessGroupEntity(snapshot.getComponent(), updatedRevision, permissions, status, bulletinEntities); } @Override public void verifyUpdateProcessGroup(ProcessGroupDTO processGroupDTO) { if (processGroupDAO.hasProcessGroup(processGroupDTO.getId())) { processGroupDAO.verifyUpdate(processGroupDTO); } } @Override public ScheduleComponentsEntity enableComponents(String processGroupId, ScheduledState state, Map componentRevisions) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); final RevisionUpdate updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new UpdateRevisionTask() { @Override public RevisionUpdate update() { // schedule the components processGroupDAO.enableComponents(processGroupId, state, componentRevisions.keySet()); // update the revisions final Map updatedRevisions = new HashMap<>(); for (final Revision revision : componentRevisions.values()) { final Revision currentRevision = revisionManager.getRevision(revision.getComponentId()); updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId())); } // save controllerFacade.save(); // gather details for response final ScheduleComponentsEntity entity = new ScheduleComponentsEntity(); entity.setId(processGroupId); entity.setState(state.name()); return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values())); } }); return updatedComponent.getComponent(); } @Override public ScheduleComponentsEntity scheduleComponents(final String processGroupId, final ScheduledState state, final Map componentRevisions) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); final RevisionUpdate updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new UpdateRevisionTask() { @Override public RevisionUpdate update() { // schedule the components processGroupDAO.scheduleComponents(processGroupId, state, componentRevisions.keySet()); // update the revisions final Map updatedRevisions = new HashMap<>(); for (final Revision revision : componentRevisions.values()) { final Revision currentRevision = revisionManager.getRevision(revision.getComponentId()); updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId())); } // save controllerFacade.save(); // gather details for response final ScheduleComponentsEntity entity = new ScheduleComponentsEntity(); entity.setId(processGroupId); entity.setState(state.name()); return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values())); } }); return updatedComponent.getComponent(); } @Override public ActivateControllerServicesEntity activateControllerServices(final String processGroupId, final ControllerServiceState state, final Map serviceRevisions) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); final RevisionUpdate updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(serviceRevisions.values()), user, new UpdateRevisionTask() { @Override public RevisionUpdate update() { // schedule the components processGroupDAO.activateControllerServices(processGroupId, state, serviceRevisions.keySet()); // update the revisions final Map updatedRevisions = new HashMap<>(); for (final Revision revision : serviceRevisions.values()) { final Revision currentRevision = revisionManager.getRevision(revision.getComponentId()); updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId())); } // save controllerFacade.save(); // gather details for response final ActivateControllerServicesEntity entity = new ActivateControllerServicesEntity(); entity.setId(processGroupId); entity.setState(state.name()); return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values())); } }); return updatedComponent.getComponent(); } @Override public ControllerConfigurationEntity updateControllerConfiguration(final Revision revision, final ControllerConfigurationDTO controllerConfigurationDTO) { final RevisionUpdate updatedComponent = updateComponent( revision, controllerFacade, () -> { if (controllerConfigurationDTO.getMaxTimerDrivenThreadCount() != null) { controllerFacade.setMaxTimerDrivenThreadCount(controllerConfigurationDTO.getMaxTimerDrivenThreadCount()); } if (controllerConfigurationDTO.getMaxEventDrivenThreadCount() != null) { controllerFacade.setMaxEventDrivenThreadCount(controllerConfigurationDTO.getMaxEventDrivenThreadCount()); } return controllerConfigurationDTO; }, controller -> dtoFactory.createControllerConfigurationDto(controllerFacade)); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerFacade); final RevisionDTO updateRevision = dtoFactory.createRevisionDTO(updatedComponent.getLastModification()); return entityFactory.createControllerConfigurationEntity(updatedComponent.getComponent(), updateRevision, permissions); } @Override public NodeDTO updateNode(final NodeDTO nodeDTO) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); if (user == null) { throw new WebApplicationException(new Throwable("Unable to access details for current user.")); } final String userDn = user.getIdentity(); final NodeIdentifier nodeId = clusterCoordinator.getNodeIdentifier(nodeDTO.getNodeId()); if (nodeId == null) { throw new UnknownNodeException("No node exists with ID " + nodeDTO.getNodeId()); } if (NodeConnectionState.CONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) { clusterCoordinator.requestNodeConnect(nodeId, userDn); } else if (NodeConnectionState.OFFLOADING.name().equalsIgnoreCase(nodeDTO.getStatus())) { clusterCoordinator.requestNodeOffload(nodeId, OffloadCode.OFFLOADED, "User " + userDn + " requested that node be offloaded"); } else if (NodeConnectionState.DISCONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) { clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.USER_DISCONNECTED, "User " + userDn + " requested that node be disconnected from cluster"); } return getNode(nodeId); } @Override public CounterDTO updateCounter(final String counterId) { return dtoFactory.createCounterDto(controllerFacade.resetCounter(counterId)); } @Override public void verifyCanClearProcessorState(final String processorId) { processorDAO.verifyClearState(processorId); } @Override public void clearProcessorState(final String processorId) { processorDAO.clearState(processorId); } @Override public void verifyCanClearControllerServiceState(final String controllerServiceId) { controllerServiceDAO.verifyClearState(controllerServiceId); } @Override public void clearControllerServiceState(final String controllerServiceId) { controllerServiceDAO.clearState(controllerServiceId); } @Override public void verifyCanClearReportingTaskState(final String reportingTaskId) { reportingTaskDAO.verifyClearState(reportingTaskId); } @Override public void clearReportingTaskState(final String reportingTaskId) { reportingTaskDAO.clearState(reportingTaskId); } @Override public ConnectionEntity deleteConnection(final Revision revision, final String connectionId) { final Connection connection = connectionDAO.getConnection(connectionId); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection); final ConnectionDTO snapshot = deleteComponent( revision, connection.getResource(), () -> connectionDAO.deleteConnection(connectionId), false, // no policies to remove dtoFactory.createConnectionDto(connection)); return entityFactory.createConnectionEntity(snapshot, null, permissions, null); } @Override public DropRequestDTO deleteFlowFileDropRequest(final String connectionId, final String dropRequestId) { return dtoFactory.createDropRequestDTO(connectionDAO.deleteFlowFileDropRequest(connectionId, dropRequestId)); } @Override public ListingRequestDTO deleteFlowFileListingRequest(final String connectionId, final String listingRequestId) { final Connection connection = connectionDAO.getConnection(connectionId); final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.deleteFlowFileListingRequest(connectionId, listingRequestId)); // include whether the source and destination are running if (connection.getSource() != null) { listRequest.setSourceRunning(connection.getSource().isRunning()); } if (connection.getDestination() != null) { listRequest.setDestinationRunning(connection.getDestination().isRunning()); } return listRequest; } @Override public ProcessorEntity deleteProcessor(final Revision revision, final String processorId) { final ProcessorNode processor = processorDAO.getProcessor(processorId); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor)); final ProcessorDTO snapshot = deleteComponent( revision, processor.getResource(), () -> processorDAO.deleteProcessor(processorId), true, dtoFactory.createProcessorDto(processor)); return entityFactory.createProcessorEntity(snapshot, null, permissions, operatePermissions, null, null); } @Override public ProcessorEntity terminateProcessor(final String processorId) { processorDAO.terminate(processorId); return getProcessor(processorId); } @Override public void verifyTerminateProcessor(final String processorId) { processorDAO.verifyTerminate(processorId); } @Override public LabelEntity deleteLabel(final Revision revision, final String labelId) { final Label label = labelDAO.getLabel(labelId); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label); final LabelDTO snapshot = deleteComponent( revision, label.getResource(), () -> labelDAO.deleteLabel(labelId), true, dtoFactory.createLabelDto(label)); return entityFactory.createLabelEntity(snapshot, null, permissions); } @Override public UserEntity deleteUser(final Revision revision, final String userId) { final User user = userDAO.getUser(userId); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); final Set userGroups = user != null ? userGroupDAO.getUserGroupsForUser(userId).stream() .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()) : null; final Set policyEntities = user != null ? userGroupDAO.getAccessPoliciesForUser(userId).stream() .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()) : null; final String resourceIdentifier = ResourceFactory.getTenantResource().getIdentifier() + "/" + userId; final UserDTO snapshot = deleteComponent( revision, new Resource() { @Override public String getIdentifier() { return resourceIdentifier; } @Override public String getName() { return resourceIdentifier; } @Override public String getSafeDescription() { return "User " + userId; } }, () -> userDAO.deleteUser(userId), false, // no user specific policies to remove dtoFactory.createUserDto(user, userGroups, policyEntities)); return entityFactory.createUserEntity(snapshot, null, permissions); } @Override public UserGroupEntity deleteUserGroup(final Revision revision, final String userGroupId) { final Group userGroup = userGroupDAO.getUserGroup(userGroupId); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); final Set users = userGroup != null ? userGroup.getUsers().stream() .map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()) : null; final Set policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(userGroup.getIdentifier()).stream() .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); final String resourceIdentifier = ResourceFactory.getTenantResource().getIdentifier() + "/" + userGroupId; final UserGroupDTO snapshot = deleteComponent( revision, new Resource() { @Override public String getIdentifier() { return resourceIdentifier; } @Override public String getName() { return resourceIdentifier; } @Override public String getSafeDescription() { return "User Group " + userGroupId; } }, () -> userGroupDAO.deleteUserGroup(userGroupId), false, // no user group specific policies to remove dtoFactory.createUserGroupDto(userGroup, users, policyEntities)); return entityFactory.createUserGroupEntity(snapshot, null, permissions); } @Override public AccessPolicyEntity deleteAccessPolicy(final Revision revision, final String accessPolicyId) { final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(accessPolicyId); final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource()); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicyId)); final Set userGroups = accessPolicy != null ? accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()) : null; final Set users = accessPolicy != null ? accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()) : null; final AccessPolicyDTO snapshot = deleteComponent( revision, new Resource() { @Override public String getIdentifier() { return accessPolicy.getResource(); } @Override public String getName() { return accessPolicy.getResource(); } @Override public String getSafeDescription() { return "Policy " + accessPolicyId; } }, () -> accessPolicyDAO.deleteAccessPolicy(accessPolicyId), false, // no need to clean up any policies as it's already been removed above dtoFactory.createAccessPolicyDto(accessPolicy, userGroups, users, componentReference)); return entityFactory.createAccessPolicyEntity(snapshot, null, permissions); } @Override public FunnelEntity deleteFunnel(final Revision revision, final String funnelId) { final Funnel funnel = funnelDAO.getFunnel(funnelId); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnel); final FunnelDTO snapshot = deleteComponent( revision, funnel.getResource(), () -> funnelDAO.deleteFunnel(funnelId), true, dtoFactory.createFunnelDto(funnel)); return entityFactory.createFunnelEntity(snapshot, null, permissions); } /** * Deletes a component using the Optimistic Locking Manager * * @param revision the current revision * @param resource the resource being removed * @param deleteAction the action that deletes the component via the appropriate DAO object * @param cleanUpPolicies whether or not the policies for this resource should be removed as well - not necessary when there are * no component specific policies or if the policies of the component are inherited * @return a dto that represents the new configuration */ private D deleteComponent(final Revision revision, final Resource resource, final Runnable deleteAction, final boolean cleanUpPolicies, final D dto) { final RevisionClaim claim = new StandardRevisionClaim(revision); final NiFiUser user = NiFiUserUtils.getNiFiUser(); return revisionManager.deleteRevision(claim, user, new DeleteRevisionTask() { @Override public D performTask() { logger.debug("Attempting to delete component {} with claim {}", resource.getIdentifier(), claim); // run the delete action deleteAction.run(); // save the flow controllerFacade.save(); logger.debug("Deletion of component {} was successful", resource.getIdentifier()); if (cleanUpPolicies) { cleanUpPolicies(resource); } return dto; } }); } /** * Clean up the policies for the specified component resource. * * @param componentResource the resource for the component */ private void cleanUpPolicies(final Resource componentResource) { // ensure the authorizer supports configuration if (accessPolicyDAO.supportsConfigurableAuthorizer()) { final List resources = new ArrayList<>(); resources.add(componentResource); resources.add(ResourceFactory.getDataResource(componentResource)); resources.add(ResourceFactory.getProvenanceDataResource(componentResource)); resources.add(ResourceFactory.getDataTransferResource(componentResource)); resources.add(ResourceFactory.getPolicyResource(componentResource)); for (final Resource resource : resources) { for (final RequestAction action : RequestAction.values()) { try { // since the component is being deleted, also delete any relevant access policies final AccessPolicy readPolicy = accessPolicyDAO.getAccessPolicy(action, resource.getIdentifier()); if (readPolicy != null) { accessPolicyDAO.deleteAccessPolicy(readPolicy.getIdentifier()); } } catch (final Exception e) { logger.warn(String.format("Unable to remove access policy for %s %s after component removal.", action, resource.getIdentifier()), e); } } } } } @Override public void verifyDeleteSnippet(final String snippetId, final Set affectedComponentIds) { snippetDAO.verifyDeleteSnippetComponents(snippetId); } @Override public SnippetEntity deleteSnippet(final Set revisions, final String snippetId) { final Snippet snippet = snippetDAO.getSnippet(snippetId); // grab the resources in the snippet so we can delete the policies afterwards final Set snippetResources = new HashSet<>(); snippet.getProcessors().keySet().forEach(id -> snippetResources.add(processorDAO.getProcessor(id).getResource())); snippet.getInputPorts().keySet().forEach(id -> snippetResources.add(inputPortDAO.getPort(id).getResource())); snippet.getOutputPorts().keySet().forEach(id -> snippetResources.add(outputPortDAO.getPort(id).getResource())); snippet.getFunnels().keySet().forEach(id -> snippetResources.add(funnelDAO.getFunnel(id).getResource())); snippet.getLabels().keySet().forEach(id -> snippetResources.add(labelDAO.getLabel(id).getResource())); snippet.getRemoteProcessGroups().keySet().forEach(id -> snippetResources.add(remoteProcessGroupDAO.getRemoteProcessGroup(id).getResource())); snippet.getProcessGroups().keySet().forEach(id -> { final ProcessGroup processGroup = processGroupDAO.getProcessGroup(id); // add the process group snippetResources.add(processGroup.getResource()); // add each encapsulated component processGroup.findAllProcessors().forEach(processor -> snippetResources.add(processor.getResource())); processGroup.findAllInputPorts().forEach(inputPort -> snippetResources.add(inputPort.getResource())); processGroup.findAllOutputPorts().forEach(outputPort -> snippetResources.add(outputPort.getResource())); processGroup.findAllFunnels().forEach(funnel -> snippetResources.add(funnel.getResource())); processGroup.findAllLabels().forEach(label -> snippetResources.add(label.getResource())); processGroup.findAllProcessGroups().forEach(childGroup -> snippetResources.add(childGroup.getResource())); processGroup.findAllRemoteProcessGroups().forEach(remoteProcessGroup -> snippetResources.add(remoteProcessGroup.getResource())); processGroup.findAllTemplates().forEach(template -> snippetResources.add(template.getResource())); processGroup.findAllControllerServices().forEach(controllerService -> snippetResources.add(controllerService.getResource())); }); final NiFiUser user = NiFiUserUtils.getNiFiUser(); final RevisionClaim claim = new StandardRevisionClaim(revisions); final SnippetDTO dto = revisionManager.deleteRevision(claim, user, new DeleteRevisionTask() { @Override public SnippetDTO performTask() { // delete the components in the snippet snippetDAO.deleteSnippetComponents(snippetId); // drop the snippet snippetDAO.dropSnippet(snippetId); // save controllerFacade.save(); // create the dto for the snippet that was just removed return dtoFactory.createSnippetDto(snippet); } }); // clean up component policies snippetResources.forEach(resource -> cleanUpPolicies(resource)); return entityFactory.createSnippetEntity(dto); } @Override public PortEntity deleteInputPort(final Revision revision, final String inputPortId) { final Port port = inputPortDAO.getPort(inputPortId); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port)); final PortDTO snapshot = deleteComponent( revision, port.getResource(), () -> inputPortDAO.deletePort(inputPortId), true, dtoFactory.createPortDto(port)); return entityFactory.createPortEntity(snapshot, null, permissions, operatePermissions, null, null); } @Override public PortEntity deleteOutputPort(final Revision revision, final String outputPortId) { final Port port = outputPortDAO.getPort(outputPortId); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port)); final PortDTO snapshot = deleteComponent( revision, port.getResource(), () -> outputPortDAO.deletePort(outputPortId), true, dtoFactory.createPortDto(port)); return entityFactory.createPortEntity(snapshot, null, permissions, operatePermissions, null, null); } @Override public ProcessGroupEntity deleteProcessGroup(final Revision revision, final String groupId) { final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); // grab the resources in the snippet so we can delete the policies afterwards final Set groupResources = new HashSet<>(); processGroup.findAllProcessors().forEach(processor -> groupResources.add(processor.getResource())); processGroup.findAllInputPorts().forEach(inputPort -> groupResources.add(inputPort.getResource())); processGroup.findAllOutputPorts().forEach(outputPort -> groupResources.add(outputPort.getResource())); processGroup.findAllFunnels().forEach(funnel -> groupResources.add(funnel.getResource())); processGroup.findAllLabels().forEach(label -> groupResources.add(label.getResource())); processGroup.findAllProcessGroups().forEach(childGroup -> groupResources.add(childGroup.getResource())); processGroup.findAllRemoteProcessGroups().forEach(remoteProcessGroup -> groupResources.add(remoteProcessGroup.getResource())); processGroup.findAllTemplates().forEach(template -> groupResources.add(template.getResource())); processGroup.findAllControllerServices().forEach(controllerService -> groupResources.add(controllerService.getResource())); final ProcessGroupDTO snapshot = deleteComponent( revision, processGroup.getResource(), () -> processGroupDAO.deleteProcessGroup(groupId), true, dtoFactory.createProcessGroupDto(processGroup)); // delete all applicable component policies groupResources.forEach(groupResource -> cleanUpPolicies(groupResource)); return entityFactory.createProcessGroupEntity(snapshot, null, permissions, null, null); } @Override public RemoteProcessGroupEntity deleteRemoteProcessGroup(final Revision revision, final String remoteProcessGroupId) { final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroup)); final RemoteProcessGroupDTO snapshot = deleteComponent( revision, remoteProcessGroup.getResource(), () -> remoteProcessGroupDAO.deleteRemoteProcessGroup(remoteProcessGroupId), true, dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup)); return entityFactory.createRemoteProcessGroupEntity(snapshot, null, permissions, operatePermissions, null, null); } @Override public void deleteTemplate(final String id) { // delete the template and save the flow templateDAO.deleteTemplate(id); controllerFacade.save(); } @Override public ConnectionEntity createConnection(final Revision revision, final String groupId, final ConnectionDTO connectionDTO) { final RevisionUpdate snapshot = createComponent( revision, connectionDTO, () -> connectionDAO.createConnection(groupId, connectionDTO), connection -> dtoFactory.createConnectionDto(connection)); final Connection connection = connectionDAO.getConnection(connectionDTO.getId()); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection); final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionDTO.getId())); return entityFactory.createConnectionEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status); } @Override public DropRequestDTO createFlowFileDropRequest(final String connectionId, final String dropRequestId) { return dtoFactory.createDropRequestDTO(connectionDAO.createFlowFileDropRequest(connectionId, dropRequestId)); } @Override public ListingRequestDTO createFlowFileListingRequest(final String connectionId, final String listingRequestId) { final Connection connection = connectionDAO.getConnection(connectionId); final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.createFlowFileListingRequest(connectionId, listingRequestId)); // include whether the source and destination are running if (connection.getSource() != null) { listRequest.setSourceRunning(connection.getSource().isRunning()); } if (connection.getDestination() != null) { listRequest.setDestinationRunning(connection.getDestination().isRunning()); } return listRequest; } @Override public ProcessorEntity createProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) { final RevisionUpdate snapshot = createComponent( revision, processorDTO, () -> processorDAO.createProcessor(groupId, processorDTO), processor -> { awaitValidationCompletion(processor); return dtoFactory.createProcessorDto(processor); }); final ProcessorNode processor = processorDAO.getProcessor(processorDTO.getId()); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor)); final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processorDTO.getId())); final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processorDTO.getId())); final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); return entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); } @Override public LabelEntity createLabel(final Revision revision, final String groupId, final LabelDTO labelDTO) { final RevisionUpdate snapshot = createComponent( revision, labelDTO, () -> labelDAO.createLabel(groupId, labelDTO), label -> dtoFactory.createLabelDto(label)); final Label label = labelDAO.getLabel(labelDTO.getId()); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label); return entityFactory.createLabelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); } /** * Creates a component using the optimistic locking manager. * * @param componentDto the DTO that will be used to create the component * @param daoCreation A Supplier that will create the NiFi Component to use * @param dtoCreation a Function that will convert the NiFi Component into a corresponding DTO * @param the DTO Type * @param the NiFi Component Type * @return a RevisionUpdate that represents the updated configuration */ private RevisionUpdate createComponent(final Revision revision, final ComponentDTO componentDto, final Supplier daoCreation, final Function dtoCreation) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); // read lock on the containing group // request claim for component to be created... revision already verified (version == 0) final RevisionClaim claim = new StandardRevisionClaim(revision); // update revision through revision manager return revisionManager.updateRevision(claim, user, () -> { // add the component final C component = daoCreation.get(); // save the flow controllerFacade.save(); final D dto = dtoCreation.apply(component); final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); return new StandardRevisionUpdate<>(dto, lastMod); }); } @Override public BulletinEntity createBulletin(final BulletinDTO bulletinDTO, final Boolean canRead){ final Bulletin bulletin = BulletinFactory.createBulletin(bulletinDTO.getCategory(),bulletinDTO.getLevel(),bulletinDTO.getMessage()); bulletinRepository.addBulletin(bulletin); return entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin),canRead); } @Override public FunnelEntity createFunnel(final Revision revision, final String groupId, final FunnelDTO funnelDTO) { final RevisionUpdate snapshot = createComponent( revision, funnelDTO, () -> funnelDAO.createFunnel(groupId, funnelDTO), funnel -> dtoFactory.createFunnelDto(funnel)); final Funnel funnel = funnelDAO.getFunnel(funnelDTO.getId()); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnel); return entityFactory.createFunnelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions); } @Override public AccessPolicyEntity createAccessPolicy(final Revision revision, final AccessPolicyDTO accessPolicyDTO) { final Authorizable tenantAuthorizable = authorizableLookup.getTenant(); final String creator = NiFiUserUtils.getNiFiUserIdentity(); final AccessPolicy newAccessPolicy = accessPolicyDAO.createAccessPolicy(accessPolicyDTO); final ComponentReferenceEntity componentReference = createComponentReferenceEntity(newAccessPolicy.getResource()); final AccessPolicyDTO newAccessPolicyDto = dtoFactory.createAccessPolicyDto(newAccessPolicy, newAccessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()), newAccessPolicy.getUsers().stream().map(userId -> { final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userId)); return entityFactory.createTenantEntity(dtoFactory.createTenantDTO(userDAO.getUser(userId)), userRevision, dtoFactory.createPermissionsDto(tenantAuthorizable)); }).collect(Collectors.toSet()), componentReference); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicyDTO.getId())); return entityFactory.createAccessPolicyEntity(newAccessPolicyDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions); } @Override public UserEntity createUser(final Revision revision, final UserDTO userDTO) { final String creator = NiFiUserUtils.getNiFiUserIdentity(); final User newUser = userDAO.createUser(userDTO); final Set tenantEntities = userGroupDAO.getUserGroupsForUser(newUser.getIdentifier()).stream() .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()); final Set policyEntities = userGroupDAO.getAccessPoliciesForUser(newUser.getIdentifier()).stream() .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); final UserDTO newUserDto = dtoFactory.createUserDto(newUser, tenantEntities, policyEntities); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); return entityFactory.createUserEntity(newUserDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions); } private ComponentReferenceEntity createComponentReferenceEntity(final String resource) { ComponentReferenceEntity componentReferenceEntity = null; try { // get the component authorizable Authorizable componentAuthorizable = authorizableLookup.getAuthorizableFromResource(resource); // if this represents an authorizable whose policy permissions are enforced through the base resource, // get the underlying base authorizable for the component reference if (componentAuthorizable instanceof EnforcePolicyPermissionsThroughBaseResource) { componentAuthorizable = ((EnforcePolicyPermissionsThroughBaseResource) componentAuthorizable).getBaseAuthorizable(); } final ComponentReferenceDTO componentReference = dtoFactory.createComponentReferenceDto(componentAuthorizable); if (componentReference != null) { final PermissionsDTO componentReferencePermissions = dtoFactory.createPermissionsDto(componentAuthorizable); final RevisionDTO componentReferenceRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(componentReference.getId())); componentReferenceEntity = entityFactory.createComponentReferenceEntity(componentReference, componentReferenceRevision, componentReferencePermissions); } } catch (final ResourceNotFoundException e) { // component not found for the specified resource } return componentReferenceEntity; } private AccessPolicySummaryEntity createAccessPolicySummaryEntity(final AccessPolicy ap) { final ComponentReferenceEntity componentReference = createComponentReferenceEntity(ap.getResource()); final AccessPolicySummaryDTO apSummary = dtoFactory.createAccessPolicySummaryDto(ap, componentReference); final PermissionsDTO apPermissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(ap.getIdentifier())); final RevisionDTO apRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(ap.getIdentifier())); return entityFactory.createAccessPolicySummaryEntity(apSummary, apRevision, apPermissions); } @Override public UserGroupEntity createUserGroup(final Revision revision, final UserGroupDTO userGroupDTO) { final String creator = NiFiUserUtils.getNiFiUserIdentity(); final Group newUserGroup = userGroupDAO.createUserGroup(userGroupDTO); final Set tenantEntities = newUserGroup.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()); final Set policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(newUserGroup.getIdentifier()).stream() .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); final UserGroupDTO newUserGroupDto = dtoFactory.createUserGroupDto(newUserGroup, tenantEntities, policyEntities); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); return entityFactory.createUserGroupEntity(newUserGroupDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions); } private void validateSnippetContents(final FlowSnippetDTO flow) { // validate any processors if (flow.getProcessors() != null) { for (final ProcessorDTO processorDTO : flow.getProcessors()) { final ProcessorNode processorNode = processorDAO.getProcessor(processorDTO.getId()); processorDTO.setValidationStatus(processorNode.getValidationStatus().name()); final Collection validationErrors = processorNode.getValidationErrors(); if (validationErrors != null && !validationErrors.isEmpty()) { final List errors = new ArrayList<>(); for (final ValidationResult validationResult : validationErrors) { errors.add(validationResult.toString()); } processorDTO.setValidationErrors(errors); } } } if (flow.getInputPorts() != null) { for (final PortDTO portDTO : flow.getInputPorts()) { final Port port = inputPortDAO.getPort(portDTO.getId()); final Collection validationErrors = port.getValidationErrors(); if (validationErrors != null && !validationErrors.isEmpty()) { final List errors = new ArrayList<>(); for (final ValidationResult validationResult : validationErrors) { errors.add(validationResult.toString()); } portDTO.setValidationErrors(errors); } } } if (flow.getOutputPorts() != null) { for (final PortDTO portDTO : flow.getOutputPorts()) { final Port port = outputPortDAO.getPort(portDTO.getId()); final Collection validationErrors = port.getValidationErrors(); if (validationErrors != null && !validationErrors.isEmpty()) { final List errors = new ArrayList<>(); for (final ValidationResult validationResult : validationErrors) { errors.add(validationResult.toString()); } portDTO.setValidationErrors(errors); } } } // get any remote process group issues if (flow.getRemoteProcessGroups() != null) { for (final RemoteProcessGroupDTO remoteProcessGroupDTO : flow.getRemoteProcessGroups()) { final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId()); if (remoteProcessGroup.getAuthorizationIssue() != null) { remoteProcessGroupDTO.setAuthorizationIssues(Arrays.asList(remoteProcessGroup.getAuthorizationIssue())); } } } } @Override public FlowEntity copySnippet(final String groupId, final String snippetId, final Double originX, final Double originY, final String idGenerationSeed) { // create the new snippet final FlowSnippetDTO snippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY, idGenerationSeed); // save the flow controllerFacade.save(); // drop the snippet snippetDAO.dropSnippet(snippetId); // post process new flow snippet final FlowDTO flowDto = postProcessNewFlowSnippet(groupId, snippet); final FlowEntity flowEntity = new FlowEntity(); flowEntity.setFlow(flowDto); return flowEntity; } @Override public SnippetEntity createSnippet(final SnippetDTO snippetDTO) { // add the component final Snippet snippet = snippetDAO.createSnippet(snippetDTO); // save the flow controllerFacade.save(); final SnippetDTO dto = dtoFactory.createSnippetDto(snippet); final RevisionUpdate snapshot = new StandardRevisionUpdate<>(dto, null); return entityFactory.createSnippetEntity(snapshot.getComponent()); } @Override public PortEntity createInputPort(final Revision revision, final String groupId, final PortDTO inputPortDTO) { final RevisionUpdate snapshot = createComponent( revision, inputPortDTO, () -> inputPortDAO.createPort(groupId, inputPortDTO), port -> dtoFactory.createPortDto(port)); final Port port = inputPortDAO.getPort(inputPortDTO.getId()); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port)); final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(port.getIdentifier())); final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier())); final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); } @Override public PortEntity createOutputPort(final Revision revision, final String groupId, final PortDTO outputPortDTO) { final RevisionUpdate snapshot = createComponent( revision, outputPortDTO, () -> outputPortDAO.createPort(groupId, outputPortDTO), port -> dtoFactory.createPortDto(port)); final Port port = outputPortDAO.getPort(outputPortDTO.getId()); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port)); final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(port.getIdentifier())); final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier())); final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); } @Override public ProcessGroupEntity createProcessGroup(final Revision revision, final String parentGroupId, final ProcessGroupDTO processGroupDTO) { final RevisionUpdate snapshot = createComponent( revision, processGroupDTO, () -> processGroupDAO.createProcessGroup(parentGroupId, processGroupDTO), processGroup -> dtoFactory.createProcessGroupDto(processGroup)); final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupDTO.getId()); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroup.getIdentifier())); final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroup.getIdentifier())); final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); return entityFactory.createProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status, bulletinEntities); } @Override public RemoteProcessGroupEntity createRemoteProcessGroup(final Revision revision, final String groupId, final RemoteProcessGroupDTO remoteProcessGroupDTO) { final RevisionUpdate snapshot = createComponent( revision, remoteProcessGroupDTO, () -> remoteProcessGroupDAO.createRemoteProcessGroup(groupId, remoteProcessGroupDTO), remoteProcessGroup -> dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup)); final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId()); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroup)); final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroup, controllerFacade.getRemoteProcessGroupStatus(remoteProcessGroup.getIdentifier())); final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(remoteProcessGroup.getIdentifier())); final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities); } @Override public boolean isRemoteGroupPortConnected(final String remoteProcessGroupId, final String remotePortId) { final RemoteProcessGroup rpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId); RemoteGroupPort port = rpg.getInputPort(remotePortId); if (port != null) { return port.hasIncomingConnection(); } port = rpg.getOutputPort(remotePortId); if (port != null) { return !port.getConnections().isEmpty(); } throw new ResourceNotFoundException("Could not find Port with ID " + remotePortId + " as a child of RemoteProcessGroup with ID " + remoteProcessGroupId); } @Override public void verifyCanAddTemplate(String groupId, String name) { templateDAO.verifyCanAddTemplate(name, groupId); } @Override public void verifyComponentTypes(FlowSnippetDTO snippet) { templateDAO.verifyComponentTypes(snippet); } @Override public void verifyComponentTypes(final VersionedProcessGroup versionedGroup) { controllerFacade.verifyComponentTypes(versionedGroup); } @Override public void verifyImportProcessGroup(final VersionControlInformationDTO versionControlInfo, final VersionedProcessGroup contents, final String groupId) { final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); verifyImportProcessGroup(versionControlInfo, contents, group); } private void verifyImportProcessGroup(final VersionControlInformationDTO vciDto, final VersionedProcessGroup contents, final ProcessGroup group) { if (group == null) { return; } final VersionControlInformation vci = group.getVersionControlInformation(); if (vci != null) { // Note that we do not compare the Registry ID here because there could be two registry clients // that point to the same server (one could point to localhost while another points to 127.0.0.1, for instance).. if (Objects.equals(vciDto.getBucketId(), vci.getBucketIdentifier()) && Objects.equals(vciDto.getFlowId(), vci.getFlowIdentifier())) { throw new IllegalStateException("Cannot import the specified Versioned Flow into the Process Group because doing so would cause a recursive dataflow. " + "If Process Group A contains Process Group B, then Process Group B is not allowed to contain the flow identified by Process Group A."); } } final Set childGroups = contents.getProcessGroups(); if (childGroups != null) { for (final VersionedProcessGroup childGroup : childGroups) { final VersionedFlowCoordinates childCoordinates = childGroup.getVersionedFlowCoordinates(); if (childCoordinates != null) { final VersionControlInformationDTO childVci = new VersionControlInformationDTO(); childVci.setBucketId(childCoordinates.getBucketId()); childVci.setFlowId(childCoordinates.getFlowId()); verifyImportProcessGroup(childVci, childGroup, group); } } } verifyImportProcessGroup(vciDto, contents, group.getParent()); } @Override public TemplateDTO createTemplate(final String name, final String description, final String snippetId, final String groupId, final Optional idGenerationSeed) { // get the specified snippet final Snippet snippet = snippetDAO.getSnippet(snippetId); // create the template final TemplateDTO templateDTO = new TemplateDTO(); templateDTO.setName(name); templateDTO.setDescription(description); templateDTO.setTimestamp(new Date()); templateDTO.setSnippet(snippetUtils.populateFlowSnippet(snippet, true, true, true)); templateDTO.setEncodingVersion(TemplateDTO.MAX_ENCODING_VERSION); // set the id based on the specified seed final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString(); templateDTO.setId(uuid); // create the template final Template template = templateDAO.createTemplate(templateDTO, groupId); // drop the snippet snippetDAO.dropSnippet(snippetId); // save the flow controllerFacade.save(); return dtoFactory.createTemplateDTO(template); } /** * Ensures default values are populated for all components in this snippet. This is necessary to handle old templates without default values * and when existing properties have default values introduced. * * @param snippet snippet */ private void ensureDefaultPropertyValuesArePopulated(final FlowSnippetDTO snippet) { if (snippet != null) { if (snippet.getControllerServices() != null) { snippet.getControllerServices().forEach(dto -> { if (dto.getProperties() == null) { dto.setProperties(new LinkedHashMap<>()); } try { final ConfigurableComponent configurableComponent = controllerFacade.getTemporaryComponent(dto.getType(), dto.getBundle()); configurableComponent.getPropertyDescriptors().forEach(descriptor -> { if (dto.getProperties().get(descriptor.getName()) == null) { dto.getProperties().put(descriptor.getName(), descriptor.getDefaultValue()); } }); } catch (final Exception e) { logger.warn(String.format("Unable to create ControllerService of type %s to populate default values.", dto.getType())); } }); } if (snippet.getProcessors() != null) { snippet.getProcessors().forEach(dto -> { if (dto.getConfig() == null) { dto.setConfig(new ProcessorConfigDTO()); } final ProcessorConfigDTO config = dto.getConfig(); if (config.getProperties() == null) { config.setProperties(new LinkedHashMap<>()); } try { final ConfigurableComponent configurableComponent = controllerFacade.getTemporaryComponent(dto.getType(), dto.getBundle()); configurableComponent.getPropertyDescriptors().forEach(descriptor -> { if (config.getProperties().get(descriptor.getName()) == null) { config.getProperties().put(descriptor.getName(), descriptor.getDefaultValue()); } }); } catch (final Exception e) { logger.warn(String.format("Unable to create Processor of type %s to populate default values.", dto.getType())); } }); } if (snippet.getProcessGroups() != null) { snippet.getProcessGroups().forEach(processGroup -> { ensureDefaultPropertyValuesArePopulated(processGroup.getContents()); }); } } } @Override public TemplateDTO importTemplate(final TemplateDTO templateDTO, final String groupId, final Optional idGenerationSeed) { // ensure id is set final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString(); templateDTO.setId(uuid); // mark the timestamp templateDTO.setTimestamp(new Date()); // ensure default values are populated ensureDefaultPropertyValuesArePopulated(templateDTO.getSnippet()); // import the template final Template template = templateDAO.importTemplate(templateDTO, groupId); // save the flow controllerFacade.save(); // return the template dto return dtoFactory.createTemplateDTO(template); } /** * Post processes a new flow snippet including validation, removing the snippet, and DTO conversion. * * @param groupId group id * @param snippet snippet * @return flow dto */ private FlowDTO postProcessNewFlowSnippet(final String groupId, final FlowSnippetDTO snippet) { // validate the new snippet validateSnippetContents(snippet); // identify all components added final Set identifiers = new HashSet<>(); snippet.getProcessors().stream() .map(proc -> proc.getId()) .forEach(id -> identifiers.add(id)); snippet.getConnections().stream() .map(conn -> conn.getId()) .forEach(id -> identifiers.add(id)); snippet.getInputPorts().stream() .map(port -> port.getId()) .forEach(id -> identifiers.add(id)); snippet.getOutputPorts().stream() .map(port -> port.getId()) .forEach(id -> identifiers.add(id)); snippet.getProcessGroups().stream() .map(group -> group.getId()) .forEach(id -> identifiers.add(id)); snippet.getRemoteProcessGroups().stream() .map(remoteGroup -> remoteGroup.getId()) .forEach(id -> identifiers.add(id)); snippet.getRemoteProcessGroups().stream() .filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getInputPorts() != null) .flatMap(remoteGroup -> remoteGroup.getContents().getInputPorts().stream()) .map(remoteInputPort -> remoteInputPort.getId()) .forEach(id -> identifiers.add(id)); snippet.getRemoteProcessGroups().stream() .filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getOutputPorts() != null) .flatMap(remoteGroup -> remoteGroup.getContents().getOutputPorts().stream()) .map(remoteOutputPort -> remoteOutputPort.getId()) .forEach(id -> identifiers.add(id)); snippet.getLabels().stream() .map(label -> label.getId()) .forEach(id -> identifiers.add(id)); final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId); return dtoFactory.createFlowDto(group, groupStatus, snippet, revisionManager, this::getProcessGroupBulletins); } @Override public FlowEntity createTemplateInstance(final String groupId, final Double originX, final Double originY, final String templateEncodingVersion, final FlowSnippetDTO requestSnippet, final String idGenerationSeed) { // instantiate the template - there is no need to make another copy of the flow snippet since the actual template // was copied and this dto is only used to instantiate it's components (which as already completed) final FlowSnippetDTO snippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateEncodingVersion, requestSnippet, idGenerationSeed); // save the flow controllerFacade.save(); // post process the new flow snippet final FlowDTO flowDto = postProcessNewFlowSnippet(groupId, snippet); final FlowEntity flowEntity = new FlowEntity(); flowEntity.setFlow(flowDto); return flowEntity; } @Override public ControllerServiceEntity createControllerService(final Revision revision, final String groupId, final ControllerServiceDTO controllerServiceDTO) { controllerServiceDTO.setParentGroupId(groupId); final NiFiUser user = NiFiUserUtils.getNiFiUser(); // request claim for component to be created... revision already verified (version == 0) final RevisionClaim claim = new StandardRevisionClaim(revision); final RevisionUpdate snapshot; if (groupId == null) { // update revision through revision manager snapshot = revisionManager.updateRevision(claim, user, () -> { // Unfortunately, we can not use the createComponent() method here because createComponent() wants to obtain the read lock // on the group. The Controller Service may or may not have a Process Group (it won't if it's controller-scoped). final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO); controllerFacade.save(); awaitValidationCompletion(controllerService); final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(controllerService); final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); return new StandardRevisionUpdate<>(dto, lastMod); }); } else { snapshot = revisionManager.updateRevision(claim, user, () -> { final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO); controllerFacade.save(); awaitValidationCompletion(controllerService); final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(controllerService); final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); return new StandardRevisionUpdate<>(dto, lastMod); }); } final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceDTO.getId()); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService)); final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(controllerServiceDTO.getId())); final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); return entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities); } @Override public ControllerServiceEntity updateControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) { // get the component, ensure we have access to it, and perform the update request final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceDTO.getId()); final RevisionUpdate snapshot = updateComponent(revision, controllerService, () -> controllerServiceDAO.updateControllerService(controllerServiceDTO), cs -> { awaitValidationCompletion(cs); final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(cs); final ControllerServiceReference ref = controllerService.getReferences(); final ControllerServiceReferencingComponentsEntity referencingComponentsEntity = createControllerServiceReferencingComponentsEntity(ref, Sets.newHashSet(controllerService.getIdentifier())); dto.setReferencingComponents(referencingComponentsEntity.getControllerServiceReferencingComponents()); return dto; }); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService)); final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(controllerServiceDTO.getId())); final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); return entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities); } @Override public ControllerServiceReferencingComponentsEntity updateControllerServiceReferencingComponents( final Map referenceRevisions, final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) { final RevisionClaim claim = new StandardRevisionClaim(referenceRevisions.values()); final NiFiUser user = NiFiUserUtils.getNiFiUser(); final RevisionUpdate update = revisionManager.updateRevision(claim, user, new UpdateRevisionTask() { @Override public RevisionUpdate update() { final Set updated = controllerServiceDAO.updateControllerServiceReferencingComponents(controllerServiceId, scheduledState, controllerServiceState); final ControllerServiceReference updatedReference = controllerServiceDAO.getControllerService(controllerServiceId).getReferences(); // get the revisions of the updated components final Map updatedRevisions = new HashMap<>(); for (final ComponentNode component : updated) { final Revision currentRevision = revisionManager.getRevision(component.getIdentifier()); final Revision requestRevision = referenceRevisions.get(component.getIdentifier()); updatedRevisions.put(component.getIdentifier(), currentRevision.incrementRevision(requestRevision.getClientId())); } // ensure the revision for all referencing components is included regardless of whether they were updated in this request for (final ComponentNode component : updatedReference.findRecursiveReferences(ComponentNode.class)) { updatedRevisions.putIfAbsent(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier())); } final ControllerServiceReferencingComponentsEntity entity = createControllerServiceReferencingComponentsEntity(updatedReference, updatedRevisions); return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values())); } }); return update.getComponent(); } /** * Finds the identifiers for all components referencing a ControllerService. * * @param reference ControllerServiceReference * @param visited ControllerServices we've already visited */ private void findControllerServiceReferencingComponentIdentifiers(final ControllerServiceReference reference, final Set visited) { for (final ComponentNode component : reference.getReferencingComponents()) { // if this is a ControllerService consider it's referencing components if (component instanceof ControllerServiceNode) { final ControllerServiceNode node = (ControllerServiceNode) component; if (!visited.contains(node)) { visited.add(node); findControllerServiceReferencingComponentIdentifiers(node.getReferences(), visited); } } } } /** * Creates entities for components referencing a ControllerService using their current revision. * * @param reference ControllerServiceReference * @return The entity */ private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity(final ControllerServiceReference reference, final Set lockedIds) { final Set visited = new HashSet<>(); visited.add(reference.getReferencedComponent()); findControllerServiceReferencingComponentIdentifiers(reference, visited); final Map referencingRevisions = new HashMap<>(); for (final ComponentNode component : reference.getReferencingComponents()) { referencingRevisions.put(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier())); } return createControllerServiceReferencingComponentsEntity(reference, referencingRevisions); } /** * Creates entities for components referencing a ControllerService using the specified revisions. * * @param reference ControllerServiceReference * @param revisions The revisions * @return The entity */ private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity( final ControllerServiceReference reference, final Map revisions) { final Set visited = new HashSet<>(); visited.add(reference.getReferencedComponent()); return createControllerServiceReferencingComponentsEntity(reference, revisions, visited); } /** * Creates entities for components referencing a ControllerServcie using the specified revisions. * * @param reference ControllerServiceReference * @param revisions The revisions * @param visited Which services we've already considered (in case of cycle) * @return The entity */ private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity( final ControllerServiceReference reference, final Map revisions, final Set visited) { final String modifier = NiFiUserUtils.getNiFiUserIdentity(); final Set referencingComponents = reference.getReferencingComponents(); final Set componentEntities = new HashSet<>(); for (final ComponentNode refComponent : referencingComponents) { final PermissionsDTO permissions = dtoFactory.createPermissionsDto(refComponent); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(refComponent)); final Revision revision = revisions.get(refComponent.getIdentifier()); final FlowModification flowMod = new FlowModification(revision, modifier); final RevisionDTO revisionDto = dtoFactory.createRevisionDTO(flowMod); final ControllerServiceReferencingComponentDTO dto = dtoFactory.createControllerServiceReferencingComponentDTO(refComponent); if (refComponent instanceof ControllerServiceNode) { final ControllerServiceNode node = (ControllerServiceNode) refComponent; // indicate if we've hit a cycle dto.setReferenceCycle(visited.contains(node)); // mark node as visited before building the reference cycle visited.add(node); // if we haven't encountered this service before include it's referencing components if (!dto.getReferenceCycle()) { final ControllerServiceReference refReferences = node.getReferences(); final Map referencingRevisions = new HashMap<>(revisions); for (final ComponentNode component : refReferences.getReferencingComponents()) { referencingRevisions.putIfAbsent(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier())); } final ControllerServiceReferencingComponentsEntity references = createControllerServiceReferencingComponentsEntity(refReferences, referencingRevisions, visited); dto.setReferencingComponents(references.getControllerServiceReferencingComponents()); } } componentEntities.add(entityFactory.createControllerServiceReferencingComponentEntity(refComponent.getIdentifier(), dto, revisionDto, permissions, operatePermissions)); } final ControllerServiceReferencingComponentsEntity entity = new ControllerServiceReferencingComponentsEntity(); entity.setControllerServiceReferencingComponents(componentEntities); return entity; } @Override public ControllerServiceEntity deleteControllerService(final Revision revision, final String controllerServiceId) { final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService)); final ControllerServiceDTO snapshot = deleteComponent( revision, controllerService.getResource(), () -> controllerServiceDAO.deleteControllerService(controllerServiceId), true, dtoFactory.createControllerServiceDto(controllerService)); return entityFactory.createControllerServiceEntity(snapshot, null, permissions, operatePermissions, null); } @Override public RegistryClientEntity createRegistryClient(Revision revision, RegistryDTO registryDTO) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); // request claim for component to be created... revision already verified (version == 0) final RevisionClaim claim = new StandardRevisionClaim(revision); // update revision through revision manager final RevisionUpdate revisionUpdate = revisionManager.updateRevision(claim, user, () -> { // add the component final FlowRegistry registry = registryDAO.createFlowRegistry(registryDTO); // save the flow controllerFacade.save(); final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); return new StandardRevisionUpdate<>(registry, lastMod); }); final FlowRegistry registry = revisionUpdate.getComponent(); return createRegistryClientEntity(registry); } @Override public RegistryClientEntity getRegistryClient(final String registryId) { final FlowRegistry registry = registryDAO.getFlowRegistry(registryId); return createRegistryClientEntity(registry); } private RegistryClientEntity createRegistryClientEntity(final FlowRegistry flowRegistry) { if (flowRegistry == null) { return null; } final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(flowRegistry.getIdentifier())); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getController()); final RegistryDTO dto = dtoFactory.createRegistryDto(flowRegistry); return entityFactory.createRegistryClientEntity(dto, revision, permissions); } private VersionedFlowEntity createVersionedFlowEntity(final String registryId, final VersionedFlow versionedFlow) { if (versionedFlow == null) { return null; } final VersionedFlowDTO dto = new VersionedFlowDTO(); dto.setRegistryId(registryId); dto.setBucketId(versionedFlow.getBucketIdentifier()); dto.setFlowId(versionedFlow.getIdentifier()); dto.setFlowName(versionedFlow.getName()); dto.setDescription(versionedFlow.getDescription()); final VersionedFlowEntity entity = new VersionedFlowEntity(); entity.setVersionedFlow(dto); return entity; } private VersionedFlowSnapshotMetadataEntity createVersionedFlowSnapshotMetadataEntity(final String registryId, final VersionedFlowSnapshotMetadata metadata) { if (metadata == null) { return null; } final VersionedFlowSnapshotMetadataEntity entity = new VersionedFlowSnapshotMetadataEntity(); entity.setRegistryId(registryId); entity.setVersionedFlowMetadata(metadata); return entity; } @Override public Set getRegistryClients() { return registryDAO.getFlowRegistries().stream() .map(this::createRegistryClientEntity) .collect(Collectors.toSet()); } @Override public Set getRegistriesForUser(final NiFiUser user) { return registryDAO.getFlowRegistriesForUser(user).stream() .map(flowRegistry -> entityFactory.createRegistryEntity(dtoFactory.createRegistryDto(flowRegistry))) .collect(Collectors.toSet()); } @Override public Set getBucketsForUser(final String registryId, final NiFiUser user) { return registryDAO.getBucketsForUser(registryId, user).stream() .map(bucket -> { if (bucket == null) { return null; } final BucketDTO dto = new BucketDTO(); dto.setId(bucket.getIdentifier()); dto.setName(bucket.getName()); dto.setDescription(bucket.getDescription()); dto.setCreated(bucket.getCreatedTimestamp()); final Permissions regPermissions = bucket.getPermissions(); final PermissionsDTO permissions = new PermissionsDTO(); permissions.setCanRead(regPermissions.getCanRead()); permissions.setCanWrite(regPermissions.getCanWrite()); return entityFactory.createBucketEntity(dto, permissions); }) .collect(Collectors.toSet()); } @Override public Set getFlowsForUser(String registryId, String bucketId, NiFiUser user) { return registryDAO.getFlowsForUser(registryId, bucketId, user).stream() .map(vf -> createVersionedFlowEntity(registryId, vf)) .collect(Collectors.toSet()); } @Override public Set getFlowVersionsForUser(String registryId, String bucketId, String flowId, NiFiUser user) { return registryDAO.getFlowVersionsForUser(registryId, bucketId, flowId, user).stream() .map(md -> createVersionedFlowSnapshotMetadataEntity(registryId, md)) .collect(Collectors.toSet()); } @Override public RegistryClientEntity updateRegistryClient(Revision revision, RegistryDTO registryDTO) { final RevisionClaim revisionClaim = new StandardRevisionClaim(revision); final NiFiUser user = NiFiUserUtils.getNiFiUser(); final FlowRegistry registry = registryDAO.getFlowRegistry(registryDTO.getId()); final RevisionUpdate revisionUpdate = revisionManager.updateRevision(revisionClaim, user, () -> { final boolean duplicateName = registryDAO.getFlowRegistries().stream() .anyMatch(reg -> reg.getName().equals(registryDTO.getName()) && !reg.getIdentifier().equals(registryDTO.getId())); if (duplicateName) { throw new IllegalStateException("Cannot update Flow Registry because a Flow Registry already exists with the name " + registryDTO.getName()); } registry.setDescription(registryDTO.getDescription()); registry.setName(registryDTO.getName()); registry.setURL(registryDTO.getUri()); controllerFacade.save(); final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId()); final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity()); return new StandardRevisionUpdate<>(registry, lastModification); }); final FlowRegistry updatedReg = revisionUpdate.getComponent(); return createRegistryClientEntity(updatedReg); } @Override public void verifyDeleteRegistry(String registryId) { processGroupDAO.verifyDeleteFlowRegistry(registryId); } @Override public RegistryClientEntity deleteRegistryClient(final Revision revision, final String registryId) { final RevisionClaim claim = new StandardRevisionClaim(revision); final NiFiUser user = NiFiUserUtils.getNiFiUser(); final FlowRegistry registry = revisionManager.deleteRevision(claim, user, () -> { final FlowRegistry reg = registryDAO.removeFlowRegistry(registryId); controllerFacade.save(); return reg; }); return createRegistryClientEntity(registry); } @Override public ReportingTaskEntity createReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); // request claim for component to be created... revision already verified (version == 0) final RevisionClaim claim = new StandardRevisionClaim(revision); // update revision through revision manager final RevisionUpdate snapshot = revisionManager.updateRevision(claim, user, () -> { // create the reporting task final ReportingTaskNode reportingTask = reportingTaskDAO.createReportingTask(reportingTaskDTO); // save the update controllerFacade.save(); awaitValidationCompletion(reportingTask); final ReportingTaskDTO dto = dtoFactory.createReportingTaskDto(reportingTask); final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); return new StandardRevisionUpdate<>(dto, lastMod); }); final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskDTO.getId()); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask)); final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier())); final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); return entityFactory.createReportingTaskEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities); } @Override public ReportingTaskEntity updateReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) { // get the component, ensure we have access to it, and perform the update request final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskDTO.getId()); final RevisionUpdate snapshot = updateComponent(revision, reportingTask, () -> reportingTaskDAO.updateReportingTask(reportingTaskDTO), rt -> { awaitValidationCompletion(rt); return dtoFactory.createReportingTaskDto(rt); }); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask)); final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier())); final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); return entityFactory.createReportingTaskEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities); } @Override public ReportingTaskEntity deleteReportingTask(final Revision revision, final String reportingTaskId) { final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask)); final ReportingTaskDTO snapshot = deleteComponent( revision, reportingTask.getResource(), () -> reportingTaskDAO.deleteReportingTask(reportingTaskId), true, dtoFactory.createReportingTaskDto(reportingTask)); return entityFactory.createReportingTaskEntity(snapshot, null, permissions, operatePermissions, null); } @Override public void deleteActions(final Date endDate) { // get the user from the request final NiFiUser user = NiFiUserUtils.getNiFiUser(); if (user == null) { throw new WebApplicationException(new Throwable("Unable to access details for current user.")); } // create the purge details final FlowChangePurgeDetails details = new FlowChangePurgeDetails(); details.setEndDate(endDate); // create a purge action to record that records are being removed final FlowChangeAction purgeAction = new FlowChangeAction(); purgeAction.setUserIdentity(user.getIdentity()); purgeAction.setOperation(Operation.Purge); purgeAction.setTimestamp(new Date()); purgeAction.setSourceId("Flow Controller"); purgeAction.setSourceName("History"); purgeAction.setSourceType(Component.Controller); purgeAction.setActionDetails(details); // purge corresponding actions auditService.purgeActions(endDate, purgeAction); } @Override public ProvenanceDTO submitProvenance(final ProvenanceDTO query) { return controllerFacade.submitProvenance(query); } @Override public void deleteProvenance(final String queryId) { controllerFacade.deleteProvenanceQuery(queryId); } @Override public LineageDTO submitLineage(final LineageDTO lineage) { return controllerFacade.submitLineage(lineage); } @Override public void deleteLineage(final String lineageId) { controllerFacade.deleteLineage(lineageId); } @Override public ProvenanceEventDTO submitReplay(final Long eventId) { return controllerFacade.submitReplay(eventId); } // ----------------------------------------- // Read Operations // ----------------------------------------- @Override public SearchResultsDTO searchController(final String query) { return controllerFacade.search(query); } @Override public DownloadableContent getContent(final String connectionId, final String flowFileUuid, final String uri) { return connectionDAO.getContent(connectionId, flowFileUuid, uri); } @Override public DownloadableContent getContent(final Long eventId, final String uri, final ContentDirection contentDirection) { return controllerFacade.getContent(eventId, uri, contentDirection); } @Override public ProvenanceDTO getProvenance(final String queryId, final Boolean summarize, final Boolean incrementalResults) { return controllerFacade.getProvenanceQuery(queryId, summarize, incrementalResults); } @Override public LineageDTO getLineage(final String lineageId) { return controllerFacade.getLineage(lineageId); } @Override public ProvenanceOptionsDTO getProvenanceSearchOptions() { return controllerFacade.getProvenanceSearchOptions(); } @Override public ProvenanceEventDTO getProvenanceEvent(final Long id) { return controllerFacade.getProvenanceEvent(id); } @Override public ProcessGroupStatusEntity getProcessGroupStatus(final String groupId, final boolean recursive) { final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); final ProcessGroupStatusDTO dto = dtoFactory.createProcessGroupStatusDto(processGroup, controllerFacade.getProcessGroupStatus(groupId)); // prune the response as necessary if (!recursive) { pruneChildGroups(dto.getAggregateSnapshot()); if (dto.getNodeSnapshots() != null) { for (final NodeProcessGroupStatusSnapshotDTO nodeSnapshot : dto.getNodeSnapshots()) { pruneChildGroups(nodeSnapshot.getStatusSnapshot()); } } } return entityFactory.createProcessGroupStatusEntity(dto, permissions); } private void pruneChildGroups(final ProcessGroupStatusSnapshotDTO snapshot) { for (final ProcessGroupStatusSnapshotEntity childProcessGroupStatusEntity : snapshot.getProcessGroupStatusSnapshots()) { final ProcessGroupStatusSnapshotDTO childProcessGroupStatus = childProcessGroupStatusEntity.getProcessGroupStatusSnapshot(); childProcessGroupStatus.setConnectionStatusSnapshots(null); childProcessGroupStatus.setProcessGroupStatusSnapshots(null); childProcessGroupStatus.setInputPortStatusSnapshots(null); childProcessGroupStatus.setOutputPortStatusSnapshots(null); childProcessGroupStatus.setProcessorStatusSnapshots(null); childProcessGroupStatus.setRemoteProcessGroupStatusSnapshots(null); } } @Override public ControllerStatusDTO getControllerStatus() { return controllerFacade.getControllerStatus(); } @Override public ComponentStateDTO getProcessorState(final String processorId) { final StateMap clusterState = isClustered() ? processorDAO.getState(processorId, Scope.CLUSTER) : null; final StateMap localState = processorDAO.getState(processorId, Scope.LOCAL); // processor will be non null as it was already found when getting the state final ProcessorNode processor = processorDAO.getProcessor(processorId); return dtoFactory.createComponentStateDTO(processorId, processor.getProcessor().getClass(), localState, clusterState); } @Override public ComponentStateDTO getControllerServiceState(final String controllerServiceId) { final StateMap clusterState = isClustered() ? controllerServiceDAO.getState(controllerServiceId, Scope.CLUSTER) : null; final StateMap localState = controllerServiceDAO.getState(controllerServiceId, Scope.LOCAL); // controller service will be non null as it was already found when getting the state final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId); return dtoFactory.createComponentStateDTO(controllerServiceId, controllerService.getControllerServiceImplementation().getClass(), localState, clusterState); } @Override public ComponentStateDTO getReportingTaskState(final String reportingTaskId) { final StateMap clusterState = isClustered() ? reportingTaskDAO.getState(reportingTaskId, Scope.CLUSTER) : null; final StateMap localState = reportingTaskDAO.getState(reportingTaskId, Scope.LOCAL); // reporting task will be non null as it was already found when getting the state final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId); return dtoFactory.createComponentStateDTO(reportingTaskId, reportingTask.getReportingTask().getClass(), localState, clusterState); } @Override public CountersDTO getCounters() { final List counters = controllerFacade.getCounters(); final Set counterDTOs = new LinkedHashSet<>(counters.size()); for (final Counter counter : counters) { counterDTOs.add(dtoFactory.createCounterDto(counter)); } final CountersSnapshotDTO snapshotDto = dtoFactory.createCountersDto(counterDTOs); final CountersDTO countersDto = new CountersDTO(); countersDto.setAggregateSnapshot(snapshotDto); return countersDto; } private ConnectionEntity createConnectionEntity(final Connection connection) { final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(connection.getIdentifier())); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection); final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connection.getIdentifier())); return entityFactory.createConnectionEntity(dtoFactory.createConnectionDto(connection), revision, permissions, status); } @Override public Set getConnections(final String groupId) { final Set connections = connectionDAO.getConnections(groupId); return connections.stream() .map(connection -> createConnectionEntity(connection)) .collect(Collectors.toSet()); } @Override public ConnectionEntity getConnection(final String connectionId) { final Connection connection = connectionDAO.getConnection(connectionId); return createConnectionEntity(connection); } @Override public DropRequestDTO getFlowFileDropRequest(final String connectionId, final String dropRequestId) { return dtoFactory.createDropRequestDTO(connectionDAO.getFlowFileDropRequest(connectionId, dropRequestId)); } @Override public ListingRequestDTO getFlowFileListingRequest(final String connectionId, final String listingRequestId) { final Connection connection = connectionDAO.getConnection(connectionId); final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.getFlowFileListingRequest(connectionId, listingRequestId)); // include whether the source and destination are running if (connection.getSource() != null) { listRequest.setSourceRunning(connection.getSource().isRunning()); } if (connection.getDestination() != null) { listRequest.setDestinationRunning(connection.getDestination().isRunning()); } return listRequest; } @Override public FlowFileDTO getFlowFile(final String connectionId, final String flowFileUuid) { return dtoFactory.createFlowFileDTO(connectionDAO.getFlowFile(connectionId, flowFileUuid)); } @Override public ConnectionStatusEntity getConnectionStatus(final String connectionId) { final Connection connection = connectionDAO.getConnection(connectionId); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection); final ConnectionStatusDTO dto = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionId)); return entityFactory.createConnectionStatusEntity(dto, permissions); } @Override public StatusHistoryEntity getConnectionStatusHistory(final String connectionId) { final Connection connection = connectionDAO.getConnection(connectionId); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection); final StatusHistoryDTO dto = controllerFacade.getConnectionStatusHistory(connectionId); return entityFactory.createStatusHistoryEntity(dto, permissions); } private ProcessorEntity createProcessorEntity(final ProcessorNode processor, final NiFiUser user) { final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processor.getIdentifier())); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor, user); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor)); final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier())); final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processor.getIdentifier())); final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); return entityFactory.createProcessorEntity(dtoFactory.createProcessorDto(processor), revision, permissions, operatePermissions, status, bulletinEntities); } @Override public Set getProcessors(final String groupId, final boolean includeDescendants) { final Set processors = processorDAO.getProcessors(groupId, includeDescendants); final NiFiUser user = NiFiUserUtils.getNiFiUser(); return processors.stream() .map(processor -> createProcessorEntity(processor, user)) .collect(Collectors.toSet()); } @Override public TemplateDTO exportTemplate(final String id) { final Template template = templateDAO.getTemplate(id); final TemplateDTO templateDetails = template.getDetails(); final TemplateDTO templateDTO = dtoFactory.createTemplateDTO(template); templateDTO.setSnippet(dtoFactory.copySnippetContents(templateDetails.getSnippet())); return templateDTO; } @Override public TemplateDTO getTemplate(final String id) { return dtoFactory.createTemplateDTO(templateDAO.getTemplate(id)); } @Override public Set getTemplates() { return templateDAO.getTemplates().stream() .map(template -> { final TemplateDTO dto = dtoFactory.createTemplateDTO(template); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(template); final TemplateEntity entity = new TemplateEntity(); entity.setId(dto.getId()); entity.setPermissions(permissions); entity.setTemplate(dto); return entity; }).collect(Collectors.toSet()); } @Override public Set getWorkQueuePrioritizerTypes() { return controllerFacade.getFlowFileComparatorTypes(); } @Override public Set getProcessorTypes(final String bundleGroup, final String bundleArtifact, final String type) { return controllerFacade.getFlowFileProcessorTypes(bundleGroup, bundleArtifact, type); } @Override public Set getControllerServiceTypes(final String serviceType, final String serviceBundleGroup, final String serviceBundleArtifact, final String serviceBundleVersion, final String bundleGroup, final String bundleArtifact, final String type) { return controllerFacade.getControllerServiceTypes(serviceType, serviceBundleGroup, serviceBundleArtifact, serviceBundleVersion, bundleGroup, bundleArtifact, type); } @Override public Set getReportingTaskTypes(final String bundleGroup, final String bundleArtifact, final String type) { return controllerFacade.getReportingTaskTypes(bundleGroup, bundleArtifact, type); } @Override public ProcessorEntity getProcessor(final String id) { final ProcessorNode processor = processorDAO.getProcessor(id); return createProcessorEntity(processor, NiFiUserUtils.getNiFiUser()); } @Override public PropertyDescriptorDTO getProcessorPropertyDescriptor(final String id, final String property) { final ProcessorNode processor = processorDAO.getProcessor(id); PropertyDescriptor descriptor = processor.getPropertyDescriptor(property); // return an invalid descriptor if the processor doesn't support this property if (descriptor == null) { descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build(); } return dtoFactory.createPropertyDescriptorDto(descriptor, processor.getProcessGroup().getIdentifier()); } @Override public ProcessorStatusEntity getProcessorStatus(final String id) { final ProcessorNode processor = processorDAO.getProcessor(id); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor); final ProcessorStatusDTO dto = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(id)); return entityFactory.createProcessorStatusEntity(dto, permissions); } @Override public StatusHistoryEntity getProcessorStatusHistory(final String id) { final ProcessorNode processor = processorDAO.getProcessor(id); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor); final StatusHistoryDTO dto = controllerFacade.getProcessorStatusHistory(id); return entityFactory.createStatusHistoryEntity(dto, permissions); } private boolean authorizeBulletin(final Bulletin bulletin) { final String sourceId = bulletin.getSourceId(); final ComponentType type = bulletin.getSourceType(); final Authorizable authorizable; try { switch (type) { case PROCESSOR: authorizable = authorizableLookup.getProcessor(sourceId).getAuthorizable(); break; case REPORTING_TASK: authorizable = authorizableLookup.getReportingTask(sourceId).getAuthorizable(); break; case CONTROLLER_SERVICE: authorizable = authorizableLookup.getControllerService(sourceId).getAuthorizable(); break; case FLOW_CONTROLLER: authorizable = controllerFacade; break; case INPUT_PORT: authorizable = authorizableLookup.getInputPort(sourceId); break; case OUTPUT_PORT: authorizable = authorizableLookup.getOutputPort(sourceId); break; case REMOTE_PROCESS_GROUP: authorizable = authorizableLookup.getRemoteProcessGroup(sourceId); break; default: throw new WebApplicationException(Response.serverError().entity("An unexpected type of component is the source of this bulletin.").build()); } } catch (final ResourceNotFoundException e) { // if the underlying component is gone, disallow return false; } // perform the authorization final AuthorizationResult result = authorizable.checkAuthorization(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); return Result.Approved.equals(result.getResult()); } @Override public BulletinBoardDTO getBulletinBoard(final BulletinQueryDTO query) { // build the query final BulletinQuery.Builder queryBuilder = new BulletinQuery.Builder() .groupIdMatches(query.getGroupId()) .sourceIdMatches(query.getSourceId()) .nameMatches(query.getName()) .messageMatches(query.getMessage()) .after(query.getAfter()) .limit(query.getLimit()); // perform the query final List results = bulletinRepository.findBulletins(queryBuilder.build()); // perform the query and generate the results - iterating in reverse order since we are // getting the most recent results by ordering by timestamp desc above. this gets the // exact results we want but in reverse order final List bulletinEntities = new ArrayList<>(); for (final ListIterator bulletinIter = results.listIterator(results.size()); bulletinIter.hasPrevious(); ) { final Bulletin bulletin = bulletinIter.previous(); bulletinEntities.add(entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), authorizeBulletin(bulletin))); } // create the bulletin board final BulletinBoardDTO bulletinBoard = new BulletinBoardDTO(); bulletinBoard.setBulletins(bulletinEntities); bulletinBoard.setGenerated(new Date()); return bulletinBoard; } @Override public SystemDiagnosticsDTO getSystemDiagnostics() { final SystemDiagnostics sysDiagnostics = controllerFacade.getSystemDiagnostics(); return dtoFactory.createSystemDiagnosticsDto(sysDiagnostics); } @Override public List getResources() { final List resources = controllerFacade.getResources(); final List resourceDtos = new ArrayList<>(resources.size()); for (final Resource resource : resources) { resourceDtos.add(dtoFactory.createResourceDto(resource)); } return resourceDtos; } @Override public void discoverCompatibleBundles(VersionedProcessGroup versionedGroup) { BundleUtils.discoverCompatibleBundles(controllerFacade.getExtensionManager(), versionedGroup); } @Override public BundleCoordinate getCompatibleBundle(String type, BundleDTO bundleDTO) { return BundleUtils.getCompatibleBundle(controllerFacade.getExtensionManager(), type, bundleDTO); } @Override public ConfigurableComponent getTempComponent(String classType, BundleCoordinate bundleCoordinate) { return controllerFacade.getExtensionManager().getTempComponent(classType, bundleCoordinate); } /** * Ensures the specified user has permission to access the specified port. This method does * not utilize the DataTransferAuthorizable as that will enforce the entire chain is * authorized for the transfer. This method is only invoked when obtaining the site to site * details so the entire chain isn't necessary. */ private boolean isUserAuthorized(final NiFiUser user, final RootGroupPort port) { final boolean isSiteToSiteSecure = Boolean.TRUE.equals(properties.isSiteToSiteSecure()); // if site to site is not secure, allow all users if (!isSiteToSiteSecure) { return true; } final Map userContext; if (user.getClientAddress() != null && !user.getClientAddress().trim().isEmpty()) { userContext = new HashMap<>(); userContext.put(UserContextKeys.CLIENT_ADDRESS.name(), user.getClientAddress()); } else { userContext = null; } final AuthorizationRequest request = new AuthorizationRequest.Builder() .resource(ResourceFactory.getDataTransferResource(port.getResource())) .identity(user.getIdentity()) .groups(user.getGroups()) .anonymous(user.isAnonymous()) .accessAttempt(false) .action(RequestAction.WRITE) .userContext(userContext) .explanationSupplier(() -> "Unable to retrieve port details.") .build(); final AuthorizationResult result = authorizer.authorize(request); return Result.Approved.equals(result.getResult()); } @Override public ControllerDTO getSiteToSiteDetails() { final NiFiUser user = NiFiUserUtils.getNiFiUser(); if (user == null) { throw new WebApplicationException(new Throwable("Unable to access details for current user.")); } // serialize the input ports this NiFi has access to final Set inputPortDtos = new LinkedHashSet<>(); final Set inputPorts = controllerFacade.getInputPorts(); for (final RootGroupPort inputPort : inputPorts) { if (isUserAuthorized(user, inputPort)) { final PortDTO dto = new PortDTO(); dto.setId(inputPort.getIdentifier()); dto.setName(inputPort.getName()); dto.setComments(inputPort.getComments()); dto.setState(inputPort.getScheduledState().toString()); inputPortDtos.add(dto); } } // serialize the output ports this NiFi has access to final Set outputPortDtos = new LinkedHashSet<>(); for (final RootGroupPort outputPort : controllerFacade.getOutputPorts()) { if (isUserAuthorized(user, outputPort)) { final PortDTO dto = new PortDTO(); dto.setId(outputPort.getIdentifier()); dto.setName(outputPort.getName()); dto.setComments(outputPort.getComments()); dto.setState(outputPort.getScheduledState().toString()); outputPortDtos.add(dto); } } // get the root group final ProcessGroup rootGroup = processGroupDAO.getProcessGroup(controllerFacade.getRootGroupId()); final ProcessGroupCounts counts = rootGroup.getCounts(); // create the controller dto final ControllerDTO controllerDTO = new ControllerDTO(); controllerDTO.setId(controllerFacade.getRootGroupId()); controllerDTO.setInstanceId(controllerFacade.getInstanceId()); controllerDTO.setName(controllerFacade.getName()); controllerDTO.setComments(controllerFacade.getComments()); controllerDTO.setInputPorts(inputPortDtos); controllerDTO.setOutputPorts(outputPortDtos); controllerDTO.setInputPortCount(inputPortDtos.size()); controllerDTO.setOutputPortCount(outputPortDtos.size()); controllerDTO.setRunningCount(counts.getRunningCount()); controllerDTO.setStoppedCount(counts.getStoppedCount()); controllerDTO.setInvalidCount(counts.getInvalidCount()); controllerDTO.setDisabledCount(counts.getDisabledCount()); // determine the site to site configuration controllerDTO.setRemoteSiteListeningPort(controllerFacade.getRemoteSiteListeningPort()); controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getRemoteSiteListeningHttpPort()); controllerDTO.setSiteToSiteSecure(controllerFacade.isRemoteSiteCommsSecure()); return controllerDTO; } @Override public ControllerConfigurationEntity getControllerConfiguration() { final Revision rev = revisionManager.getRevision(FlowController.class.getSimpleName()); final ControllerConfigurationDTO dto = dtoFactory.createControllerConfigurationDto(controllerFacade); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerFacade); final RevisionDTO revision = dtoFactory.createRevisionDTO(rev); return entityFactory.createControllerConfigurationEntity(dto, revision, permissions); } @Override public ControllerBulletinsEntity getControllerBulletins() { final NiFiUser user = NiFiUserUtils.getNiFiUser(); final ControllerBulletinsEntity controllerBulletinsEntity = new ControllerBulletinsEntity(); final List controllerBulletinEntities = new ArrayList<>(); final Authorizable controllerAuthorizable = authorizableLookup.getController(); final boolean authorized = controllerAuthorizable.isAuthorized(authorizer, RequestAction.READ, user); final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController()); controllerBulletinEntities.addAll(bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, authorized)).collect(Collectors.toList())); // get the controller service bulletins final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build(); final List allControllerServiceBulletins = bulletinRepository.findBulletins(controllerServiceQuery); final List controllerServiceBulletinEntities = new ArrayList<>(); for (final Bulletin bulletin : allControllerServiceBulletins) { try { final Authorizable controllerServiceAuthorizable = authorizableLookup.getControllerService(bulletin.getSourceId()).getAuthorizable(); final boolean controllerServiceAuthorized = controllerServiceAuthorizable.isAuthorized(authorizer, RequestAction.READ, user); final BulletinEntity controllerServiceBulletin = entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), controllerServiceAuthorized); controllerServiceBulletinEntities.add(controllerServiceBulletin); controllerBulletinEntities.add(controllerServiceBulletin); } catch (final ResourceNotFoundException e) { // controller service missing.. skip } } controllerBulletinsEntity.setControllerServiceBulletins(controllerServiceBulletinEntities); // get the reporting task bulletins final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build(); final List allReportingTaskBulletins = bulletinRepository.findBulletins(reportingTaskQuery); final List reportingTaskBulletinEntities = new ArrayList<>(); for (final Bulletin bulletin : allReportingTaskBulletins) { try { final Authorizable reportingTaskAuthorizable = authorizableLookup.getReportingTask(bulletin.getSourceId()).getAuthorizable(); final boolean reportingTaskAuthorizableAuthorized = reportingTaskAuthorizable.isAuthorized(authorizer, RequestAction.READ, user); final BulletinEntity reportingTaskBulletin = entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), reportingTaskAuthorizableAuthorized); reportingTaskBulletinEntities.add(reportingTaskBulletin); controllerBulletinEntities.add(reportingTaskBulletin); } catch (final ResourceNotFoundException e) { // reporting task missing.. skip } } controllerBulletinsEntity.setReportingTaskBulletins(reportingTaskBulletinEntities); controllerBulletinsEntity.setBulletins(pruneAndSortBulletins(controllerBulletinEntities, BulletinRepository.MAX_BULLETINS_FOR_CONTROLLER)); return controllerBulletinsEntity; } @Override public FlowConfigurationEntity getFlowConfiguration() { final FlowConfigurationDTO dto = dtoFactory.createFlowConfigurationDto(properties.getAutoRefreshInterval(), properties.getDefaultBackPressureObjectThreshold(), properties.getDefaultBackPressureDataSizeThreshold(),properties.getDcaeDistributorApiHostname()); final FlowConfigurationEntity entity = new FlowConfigurationEntity(); entity.setFlowConfiguration(dto); return entity; } @Override public AccessPolicyEntity getAccessPolicy(final String accessPolicyId) { final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(accessPolicyId); return createAccessPolicyEntity(accessPolicy); } @Override public AccessPolicyEntity getAccessPolicy(final RequestAction requestAction, final String resource) { Authorizable authorizable; try { authorizable = authorizableLookup.getAuthorizableFromResource(resource); } catch (final ResourceNotFoundException e) { // unable to find the underlying authorizable... user authorized based on top level /policies... create // an anonymous authorizable to attempt to locate an existing policy for this resource authorizable = new Authorizable() { @Override public Authorizable getParentAuthorizable() { return null; } @Override public Resource getResource() { return new Resource() { @Override public String getIdentifier() { return resource; } @Override public String getName() { return resource; } @Override public String getSafeDescription() { return "Policy " + resource; } }; } }; } final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(requestAction, authorizable); return createAccessPolicyEntity(accessPolicy); } private AccessPolicyEntity createAccessPolicyEntity(final AccessPolicy accessPolicy) { final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(accessPolicy.getIdentifier())); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicy.getIdentifier())); final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource()); return entityFactory.createAccessPolicyEntity( dtoFactory.createAccessPolicyDto(accessPolicy, accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()), accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()), componentReference), revision, permissions); } @Override public UserEntity getUser(final String userId) { final User user = userDAO.getUser(userId); return createUserEntity(user, true); } @Override public Set getUsers() { final Set users = userDAO.getUsers(); return users.stream() .map(user -> createUserEntity(user, false)) .collect(Collectors.toSet()); } private UserEntity createUserEntity(final User user, final boolean enforceUserExistence) { final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(user.getIdentifier())); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); final Set userGroups = userGroupDAO.getUserGroupsForUser(user.getIdentifier()).stream() .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(enforceUserExistence)).collect(Collectors.toSet()); final Set policyEntities = userGroupDAO.getAccessPoliciesForUser(user.getIdentifier()).stream() .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); return entityFactory.createUserEntity(dtoFactory.createUserDto(user, userGroups, policyEntities), userRevision, permissions); } private UserGroupEntity createUserGroupEntity(final Group userGroup, final boolean enforceGroupExistence) { final RevisionDTO userGroupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userGroup.getIdentifier())); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant()); final Set users = userGroup.getUsers().stream().map(mapUserIdToTenantEntity(enforceGroupExistence)).collect(Collectors.toSet()); final Set policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(userGroup.getIdentifier()).stream() .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()); return entityFactory.createUserGroupEntity(dtoFactory.createUserGroupDto(userGroup, users, policyEntities), userGroupRevision, permissions); } @Override public UserGroupEntity getUserGroup(final String userGroupId) { final Group userGroup = userGroupDAO.getUserGroup(userGroupId); return createUserGroupEntity(userGroup, true); } @Override public Set getUserGroups() { final Set userGroups = userGroupDAO.getUserGroups(); return userGroups.stream() .map(userGroup -> createUserGroupEntity(userGroup, false)) .collect(Collectors.toSet()); } private LabelEntity createLabelEntity(final Label label) { final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(label.getIdentifier())); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label); return entityFactory.createLabelEntity(dtoFactory.createLabelDto(label), revision, permissions); } @Override public Set getLabels(final String groupId) { final Set