From 9507f2f8d2ec616f01f5ee8825106300b95e8ddc Mon Sep 17 00:00:00 2001 From: Andrew Gauld Date: Fri, 7 Feb 2020 15:00:39 +0000 Subject: Add DCAE MOD design tool project Change-Id: I660b28ebfaa7e4b5f03a1df5fd17d126f58b7c14 Issue-ID: DCAEGEN2-1860 Signed-off-by: Andrew Gauld --- .../org/apache/nifi/controller/AbstractPort.java | 675 +++++++++++++++++++++ 1 file changed, 675 insertions(+) create mode 100644 mod/designtool/designtool-web/src/main/java/org/apache/nifi/controller/AbstractPort.java (limited to 'mod/designtool/designtool-web/src/main/java/org/apache/nifi/controller/AbstractPort.java') diff --git a/mod/designtool/designtool-web/src/main/java/org/apache/nifi/controller/AbstractPort.java b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/controller/AbstractPort.java new file mode 100644 index 0000000..6023fc2 --- /dev/null +++ b/mod/designtool/designtool-web/src/main/java/org/apache/nifi/controller/AbstractPort.java @@ -0,0 +1,675 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modifications to the original nifi code for the ONAP project are made + * available under the Apache License, Version 2.0 + */ +package org.apache.nifi.controller; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.nifi.authorization.Resource; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.authorization.resource.ResourceFactory; +import org.apache.nifi.authorization.resource.ResourceType; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.ConnectableType; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.connectable.Port; +import org.apache.nifi.connectable.Position; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.CharacterFilterUtils; +import org.apache.nifi.util.FormatUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static java.util.Objects.requireNonNull; + +public abstract class AbstractPort implements Port { + + public static final Relationship PORT_RELATIONSHIP = new Relationship.Builder() + .description("The relationship through which all Flow Files are transferred") + .name("") + .build(); + + public static final long MINIMUM_PENALIZATION_MILLIS = 0L; + public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; + + public static final long MINIMUM_YIELD_MILLIS = 0L; + public static final long DEFAULT_YIELD_PERIOD = 10000L; + public static final TimeUnit DEFAULT_YIELD_TIME_UNIT = TimeUnit.MILLISECONDS; + + private final List relationships; + + private final String id; + private final ConnectableType type; + private final AtomicReference name; + private final AtomicReference position; + private final AtomicReference comments; + private final AtomicReference processGroup; + private final AtomicBoolean lossTolerant; + private final AtomicReference scheduledState; + private final AtomicInteger concurrentTaskCount; + private final AtomicReference penalizationPeriod; + private final AtomicReference yieldPeriod; + private final AtomicReference schedulingPeriod; + private final AtomicReference versionedComponentId = new AtomicReference<>(); + private final AtomicLong schedulingNanos; + private final AtomicLong yieldExpiration; + private final ProcessScheduler processScheduler; + + private final Set outgoingConnections; + private final List incomingConnections; + + private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = rwLock.readLock(); + private final Lock writeLock = rwLock.writeLock(); + + public AbstractPort(final String id, final String name, final ProcessGroup processGroup, final ConnectableType type, final ProcessScheduler scheduler) { + this.id = requireNonNull(id); + this.name = new AtomicReference<>(requireNonNull(name)); + position = new AtomicReference<>(new Position(0D, 0D)); + outgoingConnections = new HashSet<>(); + incomingConnections = new ArrayList<>(); + comments = new AtomicReference<>(); + lossTolerant = new AtomicBoolean(false); + concurrentTaskCount = new AtomicInteger(1); + processScheduler = scheduler; + + final List relationshipList = new ArrayList<>(); + relationshipList.add(PORT_RELATIONSHIP); + relationships = Collections.unmodifiableList(relationshipList); + this.processGroup = new AtomicReference<>(processGroup); + this.type = type; + penalizationPeriod = new AtomicReference<>("30 sec"); + yieldPeriod = new AtomicReference<>("1 sec"); + yieldExpiration = new AtomicLong(0L); + schedulingPeriod = new AtomicReference<>("0 millis"); + schedulingNanos = new AtomicLong(MINIMUM_SCHEDULING_NANOS); + scheduledState = new AtomicReference<>(ScheduledState.STOPPED); + } + + @Override + public String getIdentifier() { + return id; + } + + @Override + public String getProcessGroupIdentifier() { + final ProcessGroup procGroup = getProcessGroup(); + return procGroup == null ? null : procGroup.getIdentifier(); + } + + @Override + public String getName() { + return name.get(); + } + + @Override + public void setName(final String name) { + if (this.name.get().equals(name)) { + return; + } + + final ProcessGroup parentGroup = this.processGroup.get(); + if (getConnectableType() == ConnectableType.INPUT_PORT) { + if (parentGroup.getInputPortByName(name) != null) { + throw new IllegalStateException("The requested new port name is not available"); + } + } else if (getConnectableType() == ConnectableType.OUTPUT_PORT) { + if (parentGroup.getOutputPortByName(name) != null) { + throw new IllegalStateException("The requested new port name is not available"); + } + } + + this.name.set(name); + } + + @Override + public Authorizable getParentAuthorizable() { + return getProcessGroup(); + } + + @Override + public Resource getResource() { + final ResourceType resourceType = ConnectableType.INPUT_PORT.equals(getConnectableType()) ? ResourceType.InputPort : ResourceType.OutputPort; + return ResourceFactory.getComponentResource(resourceType, getIdentifier(), getName()); + } + + @Override + public ProcessGroup getProcessGroup() { + return processGroup.get(); + } + + @Override + public void setProcessGroup(final ProcessGroup newGroup) { + this.processGroup.set(newGroup); + } + + @Override + public String getComments() { + return comments.get(); + } + + @Override + public void setComments(final String comments) { + this.comments.set(CharacterFilterUtils.filterInvalidXmlCharacters(comments)); + } + + @Override + public Collection getRelationships() { + return relationships; + } + + @Override + public Relationship getRelationship(final String relationshipName) { + if (PORT_RELATIONSHIP.getName().equals(relationshipName)) { + return PORT_RELATIONSHIP; + } + return null; + } + + @Override + public void addConnection(final Connection connection) throws IllegalArgumentException { + writeLock.lock(); + try { + if (!requireNonNull(connection).getSource().equals(this)) { + if (connection.getDestination().equals(this)) { + // don't add the connection twice. This may occur if we have a self-loop because we will be told + // to add the connection once because we are the source and again because we are the destination. + if (!incomingConnections.contains(connection)) { + incomingConnections.add(connection); + } + + return; + } else { + throw new IllegalArgumentException("Cannot add a connection to a LocalPort for which the LocalPort is neither the Source nor the Destination"); + } + } + + /* TODO: Will commenting this out have repercussions? + Needed to comment this out to allow use of relationships for port to processor case which was previously not supported + for (final Relationship relationship : connection.getRelationships()) { + if (!relationship.equals(PORT_RELATIONSHIP)) { + throw new IllegalArgumentException("No relationship with name " + relationship + " exists for Local Ports"); + } + } + */ + + // don't add the connection twice. This may occur if we have a self-loop because we will be told + // to add the connection once because we are the source and again because we are the destination. + if (!outgoingConnections.contains(connection)) { + outgoingConnections.add(connection); + } + } finally { + writeLock.unlock(); + } + } + + @Override + public boolean hasIncomingConnection() { + readLock.lock(); + try { + return !incomingConnections.isEmpty(); + } finally { + readLock.unlock(); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + final ProcessSession session = sessionFactory.createSession(); + + try { + onTrigger(context, session); + session.commit(); + } catch (final ProcessException e) { + session.rollback(); + throw e; + } catch (final Throwable t) { + session.rollback(); + throw new RuntimeException(t); + } + } + + public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException; + + @Override + public void updateConnection(final Connection connection) throws IllegalStateException { + if (requireNonNull(connection).getSource().equals(this)) { + writeLock.lock(); + try { + if (!outgoingConnections.remove(connection)) { + throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port"); + } + outgoingConnections.add(connection); + } finally { + writeLock.unlock(); + } + } else if (connection.getDestination().equals(this)) { + writeLock.lock(); + try { + if (!incomingConnections.remove(connection)) { + throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port"); + } + incomingConnections.add(connection); + } finally { + writeLock.unlock(); + } + } else { + throw new IllegalStateException("The given connection is not currently registered for this Port"); + } + } + + @Override + public void removeConnection(final Connection connection) throws IllegalArgumentException, IllegalStateException { + writeLock.lock(); + try { + if (!requireNonNull(connection).getSource().equals(this)) { + final boolean existed = incomingConnections.remove(connection); + if (!existed) { + throw new IllegalStateException("The given connection is not currently registered for this Port"); + } + return; + } + + if (!canConnectionBeRemoved(connection)) { + // TODO: Determine which processors will be broken if connection is removed, rather than just returning a boolean + throw new IllegalStateException("Connection " + connection.getIdentifier() + " cannot be removed"); + } + + final boolean removed = outgoingConnections.remove(connection); + if (!removed) { + throw new IllegalStateException("Connection " + connection.getIdentifier() + " is not registered with " + this.getIdentifier()); + } + } finally { + writeLock.unlock(); + } + } + + /** + * Verify that removing this connection will not prevent this Port from + * still being connected via each relationship + * + * @param connection to test for removal + * @return true if can be removed + */ + private boolean canConnectionBeRemoved(final Connection connection) { + final Connectable source = connection.getSource(); + if (!source.isRunning()) { + // we don't have to verify that this Connectable is still connected because it's okay to make + // the source invalid since it is not running. + return true; + } + + for (final Relationship relationship : source.getRelationships()) { + if (source.isAutoTerminated(relationship)) { + continue; + } + + final Set connectionsForRelationship = source.getConnections(relationship); + if (connectionsForRelationship == null || connectionsForRelationship.isEmpty()) { + return false; + } + } + + return true; + } + + @Override + public Set getConnections() { + readLock.lock(); + try { + return Collections.unmodifiableSet(outgoingConnections); + } finally { + readLock.unlock(); + } + } + + @Override + public Set getConnections(final Relationship relationship) { + readLock.lock(); + try { + if (relationship.equals(PORT_RELATIONSHIP)) { + return Collections.unmodifiableSet(outgoingConnections); + } + + throw new IllegalArgumentException("No relationship with name " + relationship.getName() + " exists for Local Ports"); + } finally { + readLock.unlock(); + } + } + + @Override + public Position getPosition() { + return position.get(); + } + + @Override + public void setPosition(final Position position) { + this.position.set(position); + } + + @Override + public String toString() { + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", getIdentifier()).toString(); + } + + @Override + public List getIncomingConnections() { + readLock.lock(); + try { + return Collections.unmodifiableList(incomingConnections); + } finally { + readLock.unlock(); + } + } + + @Override + public abstract boolean isValid(); + + @Override + public boolean isAutoTerminated(final Relationship relationship) { + return false; + } + + @Override + public boolean isLossTolerant() { + return lossTolerant.get(); + } + + @Override + public void setLossTolerant(boolean lossTolerant) { + this.lossTolerant.set(lossTolerant); + } + + @Override + public void setMaxConcurrentTasks(final int taskCount) { + if (taskCount < 1) { + throw new IllegalArgumentException(); + } + concurrentTaskCount.set(taskCount); + } + + @Override + public int getMaxConcurrentTasks() { + return concurrentTaskCount.get(); + } + + @Override + public void shutdown() { + scheduledState.set(ScheduledState.STOPPED); + } + + @Override + public void onSchedulingStart() { + scheduledState.set(ScheduledState.RUNNING); + } + + public void disable() { + final boolean updated = scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED); + if (!updated) { + throw new IllegalStateException("Port cannot be disabled because it is not stopped"); + } + } + + public void enable() { + final boolean updated = scheduledState.compareAndSet(ScheduledState.DISABLED, ScheduledState.STOPPED); + if (!updated) { + throw new IllegalStateException("Port cannot be enabled because it is not disabled"); + } + } + + @Override + public boolean isRunning() { + return getScheduledState().equals(ScheduledState.RUNNING) || processScheduler.getActiveThreadCount(this) > 0; + } + + @Override + public ScheduledState getScheduledState() { + return scheduledState.get(); + } + + @Override + public ConnectableType getConnectableType() { + return type; + } + + @Override + public void setYieldPeriod(final String yieldPeriod) { + final long yieldMillis = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS); + if (yieldMillis < 0) { + throw new IllegalArgumentException("Yield duration must be positive"); + } + this.yieldPeriod.set(yieldPeriod); + } + + @Override + public void setScheduldingPeriod(final String schedulingPeriod) { + final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), TimeUnit.NANOSECONDS); + if (schedulingNanos < 0) { + throw new IllegalArgumentException("Scheduling Period must be positive"); + } + + this.schedulingPeriod.set(schedulingPeriod); + this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos)); + } + + @Override + public long getPenalizationPeriod(final TimeUnit timeUnit) { + return FormatUtils.getTimeDuration(getPenalizationPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); + } + + @Override + public String getPenalizationPeriod() { + return penalizationPeriod.get(); + } + + @Override + public void yield() { + final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS); + yield(yieldMillis, TimeUnit.MILLISECONDS); + } + + @Override + public void yield(final long yieldDuration, final TimeUnit timeUnit) { + final long yieldMillis = timeUnit.toMillis(yieldDuration); + yieldExpiration.set(Math.max(yieldExpiration.get(), System.currentTimeMillis() + yieldMillis)); + } + + @Override + public long getYieldExpiration() { + return yieldExpiration.get(); + } + + @Override + public long getSchedulingPeriod(final TimeUnit timeUnit) { + return timeUnit.convert(schedulingNanos.get(), TimeUnit.NANOSECONDS); + } + + @Override + public String getSchedulingPeriod() { + return schedulingPeriod.get(); + } + + @Override + public void setPenalizationPeriod(final String penalizationPeriod) { + this.penalizationPeriod.set(penalizationPeriod); + } + + @Override + public String getYieldPeriod() { + return yieldPeriod.get(); + } + + @Override + public long getYieldPeriod(final TimeUnit timeUnit) { + return FormatUtils.getTimeDuration(getYieldPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); + } + + @Override + public void verifyCanDelete() throws IllegalStateException { + verifyCanDelete(false); + } + + @Override + public void verifyCanDelete(final boolean ignoreConnections) { + readLock.lock(); + try { + if (isRunning()) { + throw new IllegalStateException(this.getIdentifier() + " is running"); + } + + if (!ignoreConnections) { + for (final Connection connection : outgoingConnections) { + connection.verifyCanDelete(); + } + + for (final Connection connection : incomingConnections) { + if (connection.getSource().equals(this)) { + connection.verifyCanDelete(); + } else { + throw new IllegalStateException(this.getIdentifier() + " is the destination of another component"); + } + } + } + } finally { + readLock.unlock(); + } + } + + @Override + public void verifyCanStart() { + readLock.lock(); + try { + switch (scheduledState.get()) { + case DISABLED: + throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is disabled"); + case RUNNING: + throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is already running"); + case STOPPED: + break; + } + verifyNoActiveThreads(); + + final Collection validationResults = getValidationErrors(); + if (!validationResults.isEmpty()) { + throw new IllegalStateException(this.getIdentifier() + " is not in a valid state: " + validationResults.iterator().next().getExplanation()); + } + } finally { + readLock.unlock(); + } + } + + @Override + public void verifyCanStop() { + if (getScheduledState() != ScheduledState.RUNNING) { + throw new IllegalStateException(this.getIdentifier() + " is not scheduled to run"); + } + } + + @Override + public void verifyCanUpdate() { + readLock.lock(); + try { + if (isRunning()) { + throw new IllegalStateException(this.getIdentifier() + " is not stopped"); + } + } finally { + readLock.unlock(); + } + } + + @Override + public void verifyCanEnable() { + readLock.lock(); + try { + if (getScheduledState() != ScheduledState.DISABLED) { + throw new IllegalStateException(this.getIdentifier() + " is not disabled"); + } + + verifyNoActiveThreads(); + } finally { + readLock.unlock(); + } + } + + @Override + public void verifyCanDisable() { + readLock.lock(); + try { + if (getScheduledState() != ScheduledState.STOPPED) { + throw new IllegalStateException(this.getIdentifier() + " is not stopped"); + } + verifyNoActiveThreads(); + } finally { + readLock.unlock(); + } + } + + private void verifyNoActiveThreads() throws IllegalStateException { + final int threadCount = processScheduler.getActiveThreadCount(this); + if (threadCount > 0) { + throw new IllegalStateException(this.getIdentifier() + " has " + threadCount + " threads still active"); + } + } + + @Override + public void verifyCanClearState() { + } + + @Override + public Optional getVersionedComponentId() { + return Optional.ofNullable(versionedComponentId.get()); + } + + @Override + public void setVersionedComponentId(final String versionedComponentId) { + boolean updated = false; + while (!updated) { + final String currentId = this.versionedComponentId.get(); + + if (currentId == null) { + updated = this.versionedComponentId.compareAndSet(null, versionedComponentId); + } else if (currentId.equals(versionedComponentId)) { + return; + } else if (versionedComponentId == null) { + updated = this.versionedComponentId.compareAndSet(currentId, null); + } else { + throw new IllegalStateException(this + " is already under version control"); + } + } + } +} -- cgit 1.2.3-korg