aboutsummaryrefslogtreecommitdiffstats
path: root/models-interactions/model-actors/actorServiceProvider
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2020-01-23 18:56:40 -0500
committerJim Hahn <jrh3@att.com>2020-02-03 16:38:39 -0500
commit89ce8bc6075096d015cdc8735634e0be14fe0357 (patch)
tree166892169d2a52ebdddafc0f2c8bba2308fdad0d /models-interactions/model-actors/actorServiceProvider
parentd789d26741b22fca83168ab209e517fbfbcefcc6 (diff)
Actor redesign.
Left original code intact so that it can continue to be used until everything has been converted to use the new approach. Simply added new methods and classes. (A few minor edits were required to the old code, e.g., added constructors to the Actor implementations). Code to be removed is annotated with "TODO". This only contains one revised actor, SDNC. This actor combines code from actor.sdnc, sdnc, and drools-applications. Coverage tests are incomplete, but I anticipate some simplification to this design in a couple of days; coverage will be added at that time. Issue-ID: POLICY-1625 Signed-off-by: Jim Hahn <jrh3@att.com> Change-Id: I4b75730e3621a9ee026ad10e557abe92df10dcf4
Diffstat (limited to 'models-interactions/model-actors/actorServiceProvider')
-rw-r--r--models-interactions/model-actors/actorServiceProvider/pom.xml55
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java167
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/DelayedIdentString.java65
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java58
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java114
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java90
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java239
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java757
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/StartConfigPartial.java153
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java210
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java112
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java69
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ParameterValidationRuntimeException.java57
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java68
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/FutureManager.java89
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java115
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java157
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java52
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/ActorServiceProviderTest.java8
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/ActorServiceTest.java382
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/DelayedIdentStringTest.java93
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/DummyActor.java9
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/UtilTest.java126
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java60
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImplTest.java379
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartialTest.java978
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/StartConfigPartialTest.java212
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParamsTest.java314
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParamsTest.java130
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParamsTest.java80
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ParameterValidationRuntimeExceptionTest.java82
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParamsTest.java80
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/FutureManagerTest.java142
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManagerTest.java134
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFutureTest.java254
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml42
36 files changed, 6086 insertions, 46 deletions
diff --git a/models-interactions/model-actors/actorServiceProvider/pom.xml b/models-interactions/model-actors/actorServiceProvider/pom.xml
index ee3a924f8..54b13f0ed 100644
--- a/models-interactions/model-actors/actorServiceProvider/pom.xml
+++ b/models-interactions/model-actors/actorServiceProvider/pom.xml
@@ -18,22 +18,47 @@
============LICENSE_END=========================================================
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.onap.policy.models.policy-models-interactions.model-actors</groupId>
- <artifactId>model-actors</artifactId>
- <version>2.2.1-SNAPSHOT</version>
- </parent>
+ <parent>
+ <groupId>org.onap.policy.models.policy-models-interactions.model-actors</groupId>
+ <artifactId>model-actors</artifactId>
+ <version>2.2.1-SNAPSHOT</version>
+ </parent>
- <artifactId>actorServiceProvider</artifactId>
+ <artifactId>actorServiceProvider</artifactId>
- <dependencies>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
+ <dependencies>
+ <dependency>
+ <groupId>org.onap.policy.models.policy-models-interactions.model-impl</groupId>
+ <artifactId>aai</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.policy.models.policy-models-interactions.model-impl</groupId>
+ <artifactId>events</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.policy.common</groupId>
+ <artifactId>policy-endpoints</artifactId>
+ <version>${policy.common.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito2</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java
index 809936146..13f09b1ad 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ActorService
* ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018, 2020 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2019 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,25 +21,56 @@
package org.onap.policy.controlloop.actorserviceprovider;
-import com.google.common.collect.ImmutableList;
-
-import java.util.Iterator;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import java.util.ServiceLoader;
-
+import java.util.Set;
+import org.onap.policy.common.parameters.BeanValidationResult;
+import org.onap.policy.controlloop.actorserviceprovider.impl.StartConfigPartial;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ActorService {
-
+/**
+ * Service that manages a set of actors. To use the service, first invoke
+ * {@link #configure(Map)} to configure all of the actors, and then invoke
+ * {@link #start()} to start all of the actors. When finished using the actor service,
+ * invoke {@link #stop()} or {@link #shutdown()}.
+ */
+public class ActorService extends StartConfigPartial<Map<String, Object>> {
private static final Logger logger = LoggerFactory.getLogger(ActorService.class);
- private static ActorService service;
- // USed to load actors
- private final ServiceLoader<Actor> loader;
+ private final Map<String, Actor> name2actor;
+
+ private static class LazyHolder {
+ static final ActorService INSTANCE = new ActorService();
+ }
+
+ /**
+ * Constructs the object and loads the list of actors.
+ */
+ protected ActorService() {
+ super("actors");
+
+ Map<String, Actor> map = new HashMap<>();
+
+ for (Actor newActor : loadActors()) {
+ map.compute(newActor.getName(), (name, existingActor) -> {
+ if (existingActor == null) {
+ return newActor;
+ }
+
+ // TODO: should this throw an exception?
+ logger.warn("duplicate actor names for {}: {}, ignoring {}", name,
+ existingActor.getClass().getSimpleName(), newActor.getClass().getSimpleName());
+ return existingActor;
+ });
+ }
- private ActorService() {
- loader = ServiceLoader.load(Actor.class);
+ name2actor = ImmutableMap.copyOf(map);
}
/**
@@ -47,27 +78,115 @@ public class ActorService {
*
* @return the instance
*/
- public static synchronized ActorService getInstance() {
- if (service == null) {
- service = new ActorService();
+ public static ActorService getInstance() {
+ return LazyHolder.INSTANCE;
+ }
+
+ /**
+ * Gets a particular actor.
+ *
+ * @param name name of the actor of interest
+ * @return the desired actor
+ * @throws IllegalArgumentException if no actor by the given name exists
+ */
+ public Actor getActor(String name) {
+ Actor actor = name2actor.get(name);
+ if (actor == null) {
+ throw new IllegalArgumentException("unknown actor " + name);
}
- return service;
+
+ return actor;
}
/**
- * Get the actors.
+ * Gets the actors.
*
* @return the actors
*/
- public ImmutableList<Actor> actors() {
- Iterator<Actor> iter = loader.iterator();
- logger.debug("returning actors");
- while (iter.hasNext()) {
- if (logger.isDebugEnabled()) {
- logger.debug("Got {}", iter.next().actor());
+ public Collection<Actor> getActors() {
+ return name2actor.values();
+ }
+
+ /**
+ * Gets the names of the actors.
+ *
+ * @return the actor names
+ */
+ public Set<String> getActorNames() {
+ return name2actor.keySet();
+ }
+
+ @Override
+ protected void doConfigure(Map<String, Object> parameters) {
+ logger.info("configuring actors");
+
+ BeanValidationResult valres = new BeanValidationResult("ActorService", parameters);
+
+ for (Actor actor : name2actor.values()) {
+ String actorName = actor.getName();
+ Map<String, Object> subparams = Util.translateToMap(actorName, parameters.get(actorName));
+
+ if (subparams != null) {
+
+ try {
+ actor.configure(subparams);
+
+ } catch (ParameterValidationRuntimeException e) {
+ logger.warn("failed to configure actor {}", actorName, e);
+ valres.addResult(e.getResult());
+
+ } catch (RuntimeException e) {
+ logger.warn("failed to configure actor {}", actorName, e);
+ }
+
+ } else if (actor.isConfigured()) {
+ logger.warn("missing configuration parameters for actor {}; using previous parameters", actorName);
+
+ } else {
+ logger.warn("missing configuration parameters for actor {}; actor cannot be started", actorName);
}
}
- return ImmutableList.copyOf(loader.iterator());
+ if (!valres.isValid() && logger.isWarnEnabled()) {
+ logger.warn("actor services validation errors:\n{}", valres.getResult());
+ }
+ }
+
+ @Override
+ protected void doStart() {
+ logger.info("starting actors");
+
+ for (Actor actor : name2actor.values()) {
+ if (actor.isConfigured()) {
+ Util.logException(actor::start, "failed to start actor {}", actor.getName());
+
+ } else {
+ logger.warn("not starting unconfigured actor {}", actor.getName());
+ }
+ }
+ }
+
+ @Override
+ protected void doStop() {
+ logger.info("stopping actors");
+ name2actor.values()
+ .forEach(actor -> Util.logException(actor::stop, "failed to stop actor {}", actor.getName()));
+ }
+
+ @Override
+ protected void doShutdown() {
+ logger.info("shutting down actors");
+
+ // @formatter:off
+ name2actor.values().forEach(
+ actor -> Util.logException(actor::shutdown, "failed to shutdown actor {}", actor.getName()));
+
+ // @formatter:on
+ }
+
+ // the following methods may be overridden by junit tests
+
+ protected Iterable<Actor> loadActors() {
+ return ServiceLoader.load(Actor.class);
}
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/DelayedIdentString.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/DelayedIdentString.java
new file mode 100644
index 000000000..b7a9a53ad
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/DelayedIdentString.java
@@ -0,0 +1,65 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider;
+
+import lombok.AllArgsConstructor;
+
+/**
+ * Object whose {@link #toString()} method invokes {@link Object#toString()} on another
+ * object, on-demand. This assumes that the other object's method returns an object
+ * identifier. This is typically used to include an object's identifier in a log message.
+ */
+@AllArgsConstructor
+public class DelayedIdentString {
+ /**
+ * String to return for null objects or null object identifiers.
+ */
+ public static final String NULL_STRING = "null";
+
+ private final Object object;
+
+ /**
+ * Gets the object's identifier, after stripping anything appearing before '@'.
+ */
+ @Override
+ public String toString() {
+ if (object == null) {
+ return NULL_STRING;
+ }
+
+ String ident = objectToString();
+ if (ident == null) {
+ return NULL_STRING;
+ }
+
+ int index = ident.indexOf('@');
+ return (index > 0 ? ident.substring(index) : ident);
+ }
+
+ /**
+ * Invokes the object's {@link Object#toString()} method.
+ *
+ * @return the output from the object's {@link Object#toString()} method
+ */
+ protected String objectToString() {
+ return object.toString();
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java
new file mode 100644
index 000000000..e308ee42e
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java
@@ -0,0 +1,58 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.onap.policy.common.capabilities.Configurable;
+import org.onap.policy.common.capabilities.Startable;
+import org.onap.policy.controlloop.ControlLoopOperation;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+
+/**
+ * This is the service interface for defining an Actor operation used in Control Loop
+ * Operational Policies for performing actions on runtime entities.
+ */
+public interface Operator extends Startable, Configurable<Map<String, Object>> {
+
+ /**
+ * Gets the name of the associated actor.
+ *
+ * @return the name of the associated actor
+ */
+ String getActorName();
+
+ /**
+ * Gets the name of the operation.
+ *
+ * @return the operation name
+ */
+ String getName();
+
+ /**
+ * Called by enforcement PDP engine to start the operation. As part of the operation,
+ * it invokes the "start" and "complete" call-backs found within the parameters.
+ *
+ * @param params parameters needed to start the operation
+ * @return a future that can be used to cancel or await the result of the operation
+ */
+ CompletableFuture<ControlLoopOperation> startOperation(ControlLoopOperationParams params);
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java
new file mode 100644
index 000000000..0aba1a7fa
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java
@@ -0,0 +1,114 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Actor utilities.
+ */
+public class Util {
+ private static final Logger logger = LoggerFactory.getLogger(Util.class);
+
+ private Util() {
+ // do nothing
+ }
+
+ /**
+ * Extracts an object's identity by invoking {@link Object#toString()} and returning
+ * the portion starting with "@". Extraction is done on-demand, when toString() is
+ * called on the result. This is typically used when logging.
+ *
+ * @param object object whose identity is to be extracted
+ * @return an object that will extract the source object's identity when this object's
+ * toString() method is called
+ */
+ public static Object ident(Object object) {
+ return new DelayedIdentString(object);
+ }
+
+ /**
+ * Runs a function and logs a message if it throws an exception. Does <i>not</i>
+ * re-throw the exception.
+ *
+ * @param function function to be run
+ * @param exceptionMessage message to log if an exception is thrown
+ * @param exceptionArgs arguments to be passed to the logger
+ */
+ public static void logException(Runnable function, String exceptionMessage, Object... exceptionArgs) {
+ try {
+ function.run();
+
+ } catch (RuntimeException ex) {
+ // create a new array containing the original arguments plus the exception
+ Object[] allArgs = Arrays.copyOf(exceptionArgs, exceptionArgs.length + 1);
+ allArgs[exceptionArgs.length] = ex;
+
+ logger.warn(exceptionMessage, allArgs);
+ }
+ }
+
+ /**
+ * Translates parameters from one class to another, typically from a Map to a POJO or
+ * vice versa.
+ *
+ * @param identifier identifier of the actor/operation being translated; used to build
+ * an exception message
+ * @param source source object to be translated
+ * @param clazz target class
+ * @return the translated object
+ */
+ public static <T> T translate(String identifier, Object source, Class<T> clazz) {
+ Coder coder = new StandardCoder();
+
+ try {
+ String json = coder.encode(source);
+ return coder.decode(json, clazz);
+
+ } catch (CoderException | RuntimeException e) {
+ throw new IllegalArgumentException("cannot translate parameters for " + identifier, e);
+ }
+ }
+
+ /**
+ * Translates parameters to a Map.
+ *
+ * @param identifier identifier of the actor/operation being translated; used to build
+ * an exception message
+ * @param source source parameters
+ * @return the parameters, as a Map
+ */
+ @SuppressWarnings("unchecked")
+ public static Map<String, Object> translateToMap(String identifier, Object source) {
+ if (source == null) {
+ return null;
+ }
+
+ return translate(identifier, source, LinkedHashMap.class);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java
new file mode 100644
index 000000000..68bbe7edc
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java
@@ -0,0 +1,90 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.controlloop;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.aai.AaiCqResponse;
+import org.onap.policy.controlloop.VirtualControlLoopEvent;
+
+/**
+ * Context associated with a control loop event.
+ */
+@Getter
+@Setter
+public class ControlLoopEventContext implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ @Setter(AccessLevel.NONE)
+ private final VirtualControlLoopEvent event;
+
+ private AaiCqResponse aaiCqResponse;
+
+ // TODO may remove this if it proves not to be needed
+ @Getter(AccessLevel.NONE)
+ @Setter(AccessLevel.NONE)
+ private Map<String, Serializable> properties = new ConcurrentHashMap<>();
+
+ /**
+ * Constructs the object.
+ *
+ * @param event event with which this is associated
+ */
+ public ControlLoopEventContext(VirtualControlLoopEvent event) {
+ this.event = event;
+ }
+
+ /**
+ * Determines if the context contains a property.
+ *
+ * @param name name of the property of interest
+ * @return {@code true} if the context contains the property, {@code false} otherwise
+ */
+ public boolean contains(String name) {
+ return properties.containsKey(name);
+ }
+
+ /**
+ * Gets a property, casting it to the desired type.
+ *
+ * @param <T> desired type
+ * @param name name of the property whose value is to be retrieved
+ * @return the property's value, or {@code null} if it does not yet have a value
+ */
+ @SuppressWarnings("unchecked")
+ public <T> T getProperty(String name) {
+ return (T) properties.get(name);
+ }
+
+ /**
+ * Sets a property's value.
+ *
+ * @param name property name
+ * @param value new property value
+ */
+ public void setProperty(String name, Serializable value) {
+ properties.put(name, value);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java
new file mode 100644
index 000000000..9b9aa914e
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java
@@ -0,0 +1,239 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.onap.policy.common.parameters.BeanValidationResult;
+import org.onap.policy.controlloop.actorserviceprovider.Operator;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
+import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of an actor.
+ */
+public class ActorImpl extends StartConfigPartial<Map<String, Object>> implements Actor {
+ private static final Logger logger = LoggerFactory.getLogger(ActorImpl.class);
+
+ /**
+ * Maps a name to an operator.
+ */
+ private Map<String, Operator> name2operator;
+
+ /**
+ * Constructs the object.
+ *
+ * @param name actor name
+ * @param operators the operations supported by this actor
+ */
+ public ActorImpl(String name, Operator... operators) {
+ super(name);
+ setOperators(Arrays.asList(operators));
+ }
+
+ /**
+ * Sets the operators supported by this actor, overriding any previous list.
+ *
+ * @param operators the operations supported by this actor
+ */
+ protected void setOperators(List<Operator> operators) {
+ if (isConfigured()) {
+ throw new IllegalStateException("attempt to set operators on a configured actor: " + getName());
+ }
+
+ Map<String, Operator> map = new HashMap<>();
+ for (Operator newOp : operators) {
+ map.compute(newOp.getName(), (opName, existingOp) -> {
+ if (existingOp == null) {
+ return newOp;
+ }
+
+ // TODO: should this throw an exception?
+ logger.warn("duplicate names for actor operation {}.{}: {}, ignoring {}", getName(), opName,
+ existingOp.getClass().getSimpleName(), newOp.getClass().getSimpleName());
+ return existingOp;
+ });
+ }
+
+ this.name2operator = ImmutableMap.copyOf(map);
+ }
+
+ @Override
+ public String getName() {
+ return getFullName();
+ }
+
+ @Override
+ public Operator getOperator(String name) {
+ Operator operator = name2operator.get(name);
+ if (operator == null) {
+ throw new IllegalArgumentException("unknown operation " + getName() + "." + name);
+ }
+
+ return operator;
+ }
+
+ @Override
+ public Collection<Operator> getOperators() {
+ return name2operator.values();
+ }
+
+ @Override
+ public Set<String> getOperationNames() {
+ return name2operator.keySet();
+ }
+
+ /**
+ * For each operation, it looks for a set of parameters by the same name and, if
+ * found, configures the operation with the parameters.
+ */
+ @Override
+ protected void doConfigure(Map<String, Object> parameters) {
+ final String actorName = getName();
+ logger.info("configuring operations for actor {}", actorName);
+
+ BeanValidationResult valres = new BeanValidationResult(actorName, parameters);
+
+ // function that creates operator-specific parameters, given the operation name
+ Function<String, Map<String, Object>> opParamsMaker = makeOperatorParameters(parameters);
+
+ for (Operator operator : name2operator.values()) {
+ String operName = operator.getName();
+ Map<String, Object> subparams = opParamsMaker.apply(operName);
+
+ if (subparams != null) {
+
+ try {
+ operator.configure(subparams);
+
+ } catch (ParameterValidationRuntimeException e) {
+ logger.warn("failed to configure operation {}.{}", actorName, operName, e);
+ valres.addResult(e.getResult());
+
+ } catch (RuntimeException e) {
+ logger.warn("failed to configure operation {}.{}", actorName, operName, e);
+ }
+
+ } else if (operator.isConfigured()) {
+ logger.warn("missing configuration parameters for operation {}.{}; using previous parameters",
+ actorName, operName);
+
+ } else {
+ logger.warn("missing configuration parameters for operation {}.{}; operation cannot be started",
+ actorName, operName);
+ }
+ }
+ }
+
+ /**
+ * Extracts the operator parameters from the actor parameters, for a given operator.
+ * This method assumes each operation has its own set of parameters.
+ *
+ * @param actorParameters actor parameters
+ * @return a function to extract the operator parameters from the actor parameters.
+ * Note: this function may return {@code null} if there are no parameters for
+ * the given operation name
+ */
+ protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) {
+
+ return operName -> Util.translateToMap(getName() + "." + operName, actorParameters.get(operName));
+ }
+
+ /**
+ * Starts each operation.
+ */
+ @Override
+ protected void doStart() {
+ final String actorName = getName();
+ logger.info("starting operations for actor {}", actorName);
+
+ for (Operator oper : name2operator.values()) {
+ if (oper.isConfigured()) {
+ Util.logException(oper::start, "failed to start operation {}.{}", actorName, oper.getName());
+
+ } else {
+ logger.warn("not starting unconfigured operation {}.{}", actorName, oper.getName());
+ }
+ }
+ }
+
+ /**
+ * Stops each operation.
+ */
+ @Override
+ protected void doStop() {
+ final String actorName = getName();
+ logger.info("stopping operations for actor {}", actorName);
+
+ // @formatter:off
+ name2operator.values().forEach(
+ oper -> Util.logException(oper::stop, "failed to stop operation {}.{}", actorName, oper.getName()));
+ // @formatter:on
+ }
+
+ /**
+ * Shuts down each operation.
+ */
+ @Override
+ protected void doShutdown() {
+ final String actorName = getName();
+ logger.info("shutting down operations for actor {}", actorName);
+
+ // @formatter:off
+ name2operator.values().forEach(oper -> Util.logException(oper::shutdown,
+ "failed to shutdown operation {}.{}", actorName, oper.getName()));
+ // @formatter:on
+ }
+
+ // TODO old code: remove lines down to **HERE**
+
+ @Override
+ public String actor() {
+ return null;
+ }
+
+ @Override
+ public List<String> recipes() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<String> recipeTargets(String recipe) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<String> recipePayloads(String recipe) {
+ return Collections.emptyList();
+ }
+
+ // **HERE**
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java
new file mode 100644
index 000000000..80d8fbd04
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java
@@ -0,0 +1,757 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import lombok.Getter;
+import org.onap.policy.controlloop.ControlLoopOperation;
+import org.onap.policy.controlloop.actorserviceprovider.Operator;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
+import org.onap.policy.controlloop.policy.Policy;
+import org.onap.policy.controlloop.policy.PolicyResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Partial implementation of an operator. Subclasses can choose to simply implement
+ * {@link #doOperation(ControlLoopOperationParams)}, or they may choose to override
+ * {@link #doOperationAsFuture(ControlLoopOperationParams)}.
+ */
+public abstract class OperatorPartial extends StartConfigPartial<Map<String, Object>> implements Operator {
+
+ private static final Logger logger = LoggerFactory.getLogger(OperatorPartial.class);
+
+ private static final String OUTCOME_SUCCESS = PolicyResult.SUCCESS.toString();
+ private static final String OUTCOME_FAILURE = PolicyResult.FAILURE.toString();
+ private static final String OUTCOME_RETRIES = PolicyResult.FAILURE_RETRIES.toString();
+
+ @Getter
+ private final String actorName;
+
+ @Getter
+ private final String name;
+
+ /**
+ * Constructs the object.
+ *
+ * @param actorName name of the actor with which this operator is associated
+ * @param name operation name
+ */
+ public OperatorPartial(String actorName, String name) {
+ super(actorName + "." + name);
+ this.actorName = actorName;
+ this.name = name;
+ }
+
+ /**
+ * This method does nothing.
+ */
+ @Override
+ protected void doConfigure(Map<String, Object> parameters) {
+ // do nothing
+ }
+
+ /**
+ * This method does nothing.
+ */
+ @Override
+ protected void doStart() {
+ // do nothing
+ }
+
+ /**
+ * This method does nothing.
+ */
+ @Override
+ protected void doStop() {
+ // do nothing
+ }
+
+ /**
+ * This method does nothing.
+ */
+ @Override
+ protected void doShutdown() {
+ // do nothing
+ }
+
+ @Override
+ public final CompletableFuture<ControlLoopOperation> startOperation(ControlLoopOperationParams params) {
+ if (!isAlive()) {
+ throw new IllegalStateException("operation is not running: " + getFullName());
+ }
+
+ final Executor executor = params.getExecutor();
+
+ // allocate a controller for the entire operation
+ final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
+
+ CompletableFuture<ControlLoopOperation> preproc = startPreprocessor(params);
+ if (preproc == null) {
+ // no preprocessor required - just start the operation
+ return startOperationAttempt(params, controller, 1);
+ }
+
+ // propagate "stop" to the preprocessor
+ controller.add(preproc);
+
+ /*
+ * Do preprocessor first and then, if successful, start the operation. Note:
+ * operations create their own outcome, ignoring the outcome from any previous
+ * steps.
+ */
+ preproc.whenCompleteAsync(controller.delayedRemove(preproc), executor)
+ .thenComposeAsync(handleFailure(params, controller), executor)
+ .thenComposeAsync(onSuccess(params, unused -> startOperationAttempt(params, controller, 1)),
+ executor);
+
+ return controller;
+ }
+
+ /**
+ * Starts an operation's preprocessor step(s). If the preprocessor fails, then it
+ * invokes the started and completed call-backs.
+ *
+ * @param params operation parameters
+ * @return a future that will return the preprocessor outcome, or {@code null} if this
+ * operation needs no preprocessor
+ */
+ protected CompletableFuture<ControlLoopOperation> startPreprocessor(ControlLoopOperationParams params) {
+ logger.info("{}: start low-level operation preprocessor for {}", getFullName(), params.getRequestId());
+
+ final Executor executor = params.getExecutor();
+ final ControlLoopOperation operation = params.makeOutcome();
+
+ final Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preproc =
+ doPreprocessorAsFuture(params);
+ if (preproc == null) {
+ // no preprocessor required
+ return null;
+ }
+
+ // allocate a controller for the preprocessor steps
+ final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
+
+ /*
+ * Don't mark it complete until we've built the whole pipeline. This will prevent
+ * the operation from starting until after it has been successfully built (i.e.,
+ * without generating any exceptions).
+ */
+ final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>();
+
+ // @formatter:off
+ firstFuture
+ .thenComposeAsync(controller.add(preproc), executor)
+ .exceptionally(fromException(params, operation))
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+ // @formatter:on
+
+ // start the pipeline
+ firstFuture.complete(operation);
+
+ return controller;
+ }
+
+ /**
+ * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
+ * invokes the call-backs and returns a failed outcome. Otherwise, it returns the
+ * outcome that it received.
+ *
+ * @param params operation parameters
+ * @param controller pipeline controller
+ * @return a function that checks the outcome status and continues, if successful, or
+ * indicates a failure otherwise
+ */
+ private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> handleFailure(
+ ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller) {
+
+ return outcome -> {
+
+ if (outcome != null && isSuccess(outcome)) {
+ logger.trace("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
+ return CompletableFuture.completedFuture(outcome);
+ }
+
+ logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
+
+ final Executor executor = params.getExecutor();
+ final CallbackManager callbacks = new CallbackManager();
+
+ // propagate "stop" to the callbacks
+ controller.add(callbacks);
+
+ final ControlLoopOperation outcome2 = params.makeOutcome();
+
+ // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
+
+ outcome2.setOutcome(PolicyResult.FAILURE_GUARD.toString());
+ outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
+
+ CompletableFuture.completedFuture(outcome2).thenApplyAsync(callbackStarted(params, callbacks), executor)
+ .thenApplyAsync(callbackCompleted(params, callbacks), executor)
+ .whenCompleteAsync(controller.delayedRemove(callbacks), executor)
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+
+ return controller;
+ };
+ }
+
+ /**
+ * Invokes the operation's preprocessor step(s) as a "future". This method simply
+ * returns {@code null}.
+ * <p/>
+ * This method assumes the following:
+ * <ul>
+ * <li>the operator is alive</li>
+ * <li>exceptions generated within the pipeline will be handled by the invoker</li>
+ * </ul>
+ *
+ * @param params operation parameters
+ * @return a function that will start the preprocessor and returns its outcome, or
+ * {@code null} if this operation needs no preprocessor
+ */
+ protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture(
+ ControlLoopOperationParams params) {
+ return null;
+ }
+
+ /**
+ * Starts the operation attempt, with no preprocessor. When all retries complete, it
+ * will complete the controller.
+ *
+ * @param params operation parameters
+ * @param controller controller for all operation attempts
+ * @param attempt attempt number, typically starting with 1
+ * @return a future that will return the final result of all attempts
+ */
+ private CompletableFuture<ControlLoopOperation> startOperationAttempt(ControlLoopOperationParams params,
+ PipelineControllerFuture<ControlLoopOperation> controller, int attempt) {
+
+ final Executor executor = params.getExecutor();
+
+ CompletableFuture<ControlLoopOperation> future = startAttemptWithoutRetries(params, attempt);
+
+ // propagate "stop" to the operation attempt
+ controller.add(future);
+
+ // detach when complete
+ future.whenCompleteAsync(controller.delayedRemove(future), executor)
+ .thenComposeAsync(retryOnFailure(params, controller, attempt), params.getExecutor())
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+
+ return controller;
+ }
+
+ /**
+ * Starts the operation attempt, without doing any retries.
+ *
+ * @param params operation parameters
+ * @param attempt attempt number, typically starting with 1
+ * @return a future that will return the result of a single operation attempt
+ */
+ private CompletableFuture<ControlLoopOperation> startAttemptWithoutRetries(ControlLoopOperationParams params,
+ int attempt) {
+
+ logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
+
+ final Executor executor = params.getExecutor();
+ final ControlLoopOperation outcome = params.makeOutcome();
+ final CallbackManager callbacks = new CallbackManager();
+
+ // this operation attempt gets its own controller
+ final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
+
+ // propagate "stop" to the callbacks
+ controller.add(callbacks);
+
+ /*
+ * Don't mark it complete until we've built the whole pipeline. This will prevent
+ * the operation from starting until after it has been successfully built (i.e.,
+ * without generating any exceptions).
+ */
+ final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>();
+
+ // @formatter:off
+ CompletableFuture<ControlLoopOperation> future2 =
+ firstFuture.thenComposeAsync(verifyRunning(controller, params), executor)
+ .thenApplyAsync(callbackStarted(params, callbacks), executor)
+ .thenComposeAsync(controller.add(doOperationAsFuture(params, attempt)), executor);
+ // @formatter:on
+
+ // handle timeouts, if specified
+ long timeoutMillis = getTimeOutMillis(params.getPolicy());
+ if (timeoutMillis > 0) {
+ logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
+ future2 = future2.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
+ }
+
+ /*
+ * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
+ * before callbackCompleted() is invoked.
+ *
+ * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
+ * the pipeline as the last step anyway.
+ */
+
+ // @formatter:off
+ future2.exceptionally(fromException(params, outcome))
+ .thenApplyAsync(setRetryFlag(params, attempt), executor)
+ .thenApplyAsync(callbackStarted(params, callbacks), executor)
+ .thenApplyAsync(callbackCompleted(params, callbacks), executor)
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+ // @formatter:on
+
+ // start the pipeline
+ firstFuture.complete(outcome);
+
+ return controller;
+ }
+
+ /**
+ * Determines if the outcome was successful.
+ *
+ * @param outcome outcome to examine
+ * @return {@code true} if the outcome was successful
+ */
+ protected boolean isSuccess(ControlLoopOperation outcome) {
+ return OUTCOME_SUCCESS.equals(outcome.getOutcome());
+ }
+
+ /**
+ * Determines if the outcome was a failure for this operator.
+ *
+ * @param outcome outcome to examine, or {@code null}
+ * @return {@code true} if the outcome is not {@code null} and was a failure
+ * <i>and</i> was associated with this operator, {@code false} otherwise
+ */
+ protected boolean isActorFailed(ControlLoopOperation outcome) {
+ return OUTCOME_FAILURE.equals(getActorOutcome(outcome));
+ }
+
+ /**
+ * Invokes the operation as a "future". This method simply invokes
+ * {@link #doOperation(ControlLoopOperationParams)} turning it into a "future".
+ * <p/>
+ * This method assumes the following:
+ * <ul>
+ * <li>the operator is alive</li>
+ * <li>verifyRunning() has been invoked</li>
+ * <li>callbackStarted() has been invoked</li>
+ * <li>the invoker will perform appropriate timeout checks</li>
+ * <li>exceptions generated within the pipeline will be handled by the invoker</li>
+ * </ul>
+ *
+ * @param params operation parameters
+ * @param attempt attempt number, typically starting with 1
+ * @return a function that will start the operation and return its result when
+ * complete
+ */
+ protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture(
+ ControlLoopOperationParams params, int attempt) {
+
+ /*
+ * TODO As doOperation() may perform blocking I/O, this should be launched in its
+ * own thread to prevent the ForkJoinPool from being tied up. Should probably
+ * provide a method to make that easy.
+ */
+
+ return operation -> CompletableFuture.supplyAsync(() -> doOperation(params, attempt, operation),
+ params.getExecutor());
+ }
+
+ /**
+ * Low-level method that performs the operation. This can make the same assumptions
+ * that are made by {@link #doOperationAsFuture(ControlLoopOperationParams)}. This
+ * method throws an {@link UnsupportedOperationException}.
+ *
+ * @param params operation parameters
+ * @param attempt attempt number, typically starting with 1
+ * @param operation the operation being performed
+ * @return the outcome of the operation
+ */
+ protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt,
+ ControlLoopOperation operation) {
+
+ throw new UnsupportedOperationException("start operation " + getFullName());
+ }
+
+ /**
+ * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
+ * FAILURE, assuming the policy specifies retries and the retry count has been
+ * exhausted.
+ *
+ * @param params operation parameters
+ * @param attempt latest attempt number, starting with 1
+ * @return a function to get the next future to execute
+ */
+ private Function<ControlLoopOperation, ControlLoopOperation> setRetryFlag(ControlLoopOperationParams params,
+ int attempt) {
+
+ return operation -> {
+ if (operation != null && !isActorFailed(operation)) {
+ /*
+ * wrong type or wrong operation - just leave it as is. No need to log
+ * anything here, as retryOnFailure() will log a message
+ */
+ return operation;
+ }
+
+ // get a non-null operation
+ ControlLoopOperation oper2;
+ if (operation != null) {
+ oper2 = operation;
+ } else {
+ oper2 = params.makeOutcome();
+ oper2.setOutcome(OUTCOME_FAILURE);
+ }
+
+ if (params.getPolicy().getRetry() != null && params.getPolicy().getRetry() > 0
+ && attempt > params.getPolicy().getRetry()) {
+ /*
+ * retries were specified and we've already tried them all - change to
+ * FAILURE_RETRIES
+ */
+ logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
+ oper2.setOutcome(OUTCOME_RETRIES);
+ }
+
+ return oper2;
+ };
+ }
+
+ /**
+ * Restarts the operation if it was a FAILURE. Assumes that
+ * {@link #setRetryFlag(ControlLoopOperationParams, int)} was previously invoked, and
+ * thus that the "operation" is not {@code null}.
+ *
+ * @param params operation parameters
+ * @param controller controller for all of the retries
+ * @param attempt latest attempt number, starting with 1
+ * @return a function to get the next future to execute
+ */
+ private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> retryOnFailure(
+ ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller,
+ int attempt) {
+
+ return operation -> {
+ if (!isActorFailed(operation)) {
+ // wrong type or wrong operation - just leave it as is
+ logger.trace("not retrying operation {} for {}", getFullName(), params.getRequestId());
+ return CompletableFuture.completedFuture(operation);
+ }
+
+ if (params.getPolicy().getRetry() == null || params.getPolicy().getRetry() <= 0) {
+ // no retries - already marked as FAILURE, so just return it
+ logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
+ return CompletableFuture.completedFuture(operation);
+ }
+
+
+ /*
+ * Retry the operation.
+ */
+ logger.info("retry operation {} for {}", getFullName(), params.getRequestId());
+
+ return startOperationAttempt(params, controller, attempt + 1);
+ };
+ }
+
+ /**
+ * Gets the outcome of an operation for this operation.
+ *
+ * @param operation operation whose outcome is to be extracted
+ * @return the outcome of the given operation, if it's for this operator, {@code null}
+ * otherwise
+ */
+ protected String getActorOutcome(ControlLoopOperation operation) {
+ if (operation == null) {
+ return null;
+ }
+
+ if (!getActorName().equals(operation.getActor())) {
+ return null;
+ }
+
+ if (!getName().equals(operation.getOperation())) {
+ return null;
+ }
+
+ return operation.getOutcome();
+ }
+
+ /**
+ * Gets a function that will start the next step, if the current operation was
+ * successful, or just return the current operation, otherwise.
+ *
+ * @param params operation parameters
+ * @param nextStep function that will invoke the next step, passing it the operation
+ * @return a function that will start the next step
+ */
+ protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> onSuccess(
+ ControlLoopOperationParams params,
+ Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> nextStep) {
+
+ return operation -> {
+
+ if (operation == null) {
+ logger.trace("{}: null outcome - discarding next task for {}", getFullName(), params.getRequestId());
+ ControlLoopOperation outcome = params.makeOutcome();
+ outcome.setOutcome(OUTCOME_FAILURE);
+ return CompletableFuture.completedFuture(outcome);
+
+ } else if (isSuccess(operation)) {
+ logger.trace("{}: success - starting next task for {}", getFullName(), params.getRequestId());
+ return nextStep.apply(operation);
+
+ } else {
+ logger.trace("{}: failure - discarding next task for {}", getFullName(), params.getRequestId());
+ return CompletableFuture.completedFuture(operation);
+ }
+ };
+ }
+
+ /**
+ * Converts an exception into an operation outcome, returning a copy of the outcome to
+ * prevent background jobs from changing it.
+ *
+ * @param params operation parameters
+ * @param operation current operation
+ * @return a function that will convert an exception into an operation outcome
+ */
+ private Function<Throwable, ControlLoopOperation> fromException(ControlLoopOperationParams params,
+ ControlLoopOperation operation) {
+
+ return thrown -> {
+ logger.warn("exception throw by operation {}.{} for {}", operation.getActor(), operation.getOperation(),
+ params.getRequestId(), thrown);
+
+ /*
+ * Must make a copy of the operation, as the original could be changed by
+ * background jobs that might still be running.
+ */
+ return setOutcome(params, new ControlLoopOperation(operation), thrown);
+ };
+ }
+
+ /**
+ * Gets a function to verify that the operation is still running. If the pipeline is
+ * not running, then it returns an incomplete future, which will effectively halt
+ * subsequent operations in the pipeline. This method is intended to be used with one
+ * of the {@link CompletableFuture}'s <i>thenCompose()</i> methods.
+ *
+ * @param controller pipeline controller
+ * @param params operation parameters
+ * @return a function to verify that the operation is still running
+ */
+ protected <T> Function<T, CompletableFuture<T>> verifyRunning(
+ PipelineControllerFuture<ControlLoopOperation> controller, ControlLoopOperationParams params) {
+
+ return value -> {
+ boolean running = controller.isRunning();
+ logger.trace("{}: verify running {} for {}", getFullName(), running, params.getRequestId());
+
+ return (running ? CompletableFuture.completedFuture(value) : new CompletableFuture<>());
+ };
+ }
+
+ /**
+ * Sets the start time of the operation and invokes the callback to indicate that the
+ * operation has started. Does nothing if the pipeline has been stopped.
+ * <p/>
+ * This assumes that the "outcome" is not {@code null}.
+ *
+ * @param params operation parameters
+ * @param callbacks used to determine if the start callback can be invoked
+ * @return a function that sets the start time and invokes the callback
+ */
+ private Function<ControlLoopOperation, ControlLoopOperation> callbackStarted(ControlLoopOperationParams params,
+ CallbackManager callbacks) {
+
+ return outcome -> {
+
+ if (callbacks.canStart()) {
+ // haven't invoked "start" callback yet
+ outcome.setStart(callbacks.getStartTime());
+ outcome.setEnd(null);
+ params.callbackStarted(outcome);
+ }
+
+ return outcome;
+ };
+ }
+
+ /**
+ * Sets the end time of the operation and invokes the callback to indicate that the
+ * operation has completed. Does nothing if the pipeline has been stopped.
+ * <p/>
+ * This assumes that the "outcome" is not {@code null}.
+ * <p/>
+ * Note: the start time must be a reference rather than a plain value, because it's
+ * value must be gotten on-demand, when the returned function is executed at a later
+ * time.
+ *
+ * @param params operation parameters
+ * @param callbacks used to determine if the end callback can be invoked
+ * @return a function that sets the end time and invokes the callback
+ */
+ private Function<ControlLoopOperation, ControlLoopOperation> callbackCompleted(ControlLoopOperationParams params,
+ CallbackManager callbacks) {
+
+ return operation -> {
+
+ if (callbacks.canEnd()) {
+ operation.setStart(callbacks.getStartTime());
+ operation.setEnd(callbacks.getEndTime());
+ params.callbackCompleted(operation);
+ }
+
+ return operation;
+ };
+ }
+
+ /**
+ * Sets an operation's outcome and message, based on a throwable.
+ *
+ * @param params operation parameters
+ * @param operation operation to be updated
+ * @return the updated operation
+ */
+ protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation,
+ Throwable thrown) {
+ PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
+ return setOutcome(params, operation, result);
+ }
+
+ /**
+ * Sets an operation's outcome and default message based on the result.
+ *
+ * @param params operation parameters
+ * @param operation operation to be updated
+ * @param result result of the operation
+ * @return the updated operation
+ */
+ protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation,
+ PolicyResult result) {
+ logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
+ operation.setOutcome(result.toString());
+ operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
+ : ControlLoopOperation.FAILED_MSG);
+
+ return operation;
+ }
+
+ /**
+ * Determines if a throwable is due to a timeout.
+ *
+ * @param thrown throwable of interest
+ * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
+ */
+ protected boolean isTimeout(Throwable thrown) {
+ if (thrown instanceof CompletionException) {
+ thrown = thrown.getCause();
+ }
+
+ return (thrown instanceof TimeoutException);
+ }
+
+ // these may be overridden by junit tests
+
+ /**
+ * Gets the operation timeout. Subclasses may override this method to obtain the
+ * timeout in some other way (e.g., through configuration properties).
+ *
+ * @param policy policy from which to extract the timeout
+ * @return the operation timeout, in milliseconds
+ */
+ protected long getTimeOutMillis(Policy policy) {
+ Integer timeoutSec = policy.getTimeout();
+ return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
+ }
+
+ /**
+ * Manager for "start" and "end" callbacks.
+ */
+ private static class CallbackManager implements Runnable {
+ private final AtomicReference<Instant> startTime = new AtomicReference<>();
+ private final AtomicReference<Instant> endTime = new AtomicReference<>();
+
+ /**
+ * Determines if the "start" callback can be invoked. If so, it sets the
+ * {@link #startTime} to the current time.
+ *
+ * @return {@code true} if the "start" callback can be invoked, {@code false}
+ * otherwise
+ */
+ public boolean canStart() {
+ return startTime.compareAndSet(null, Instant.now());
+ }
+
+ /**
+ * Determines if the "end" callback can be invoked. If so, it sets the
+ * {@link #endTime} to the current time.
+ *
+ * @return {@code true} if the "end" callback can be invoked, {@code false}
+ * otherwise
+ */
+ public boolean canEnd() {
+ return endTime.compareAndSet(null, Instant.now());
+ }
+
+ /**
+ * Gets the start time.
+ *
+ * @return the start time, or {@code null} if {@link #canStart()} has not been
+ * invoked yet.
+ */
+ public Instant getStartTime() {
+ return startTime.get();
+ }
+
+ /**
+ * Gets the end time.
+ *
+ * @return the end time, or {@code null} if {@link #canEnd()} has not been invoked
+ * yet.
+ */
+ public Instant getEndTime() {
+ return endTime.get();
+ }
+
+ /**
+ * Prevents further callbacks from being executed by setting {@link #startTime}
+ * and {@link #endTime}.
+ */
+ @Override
+ public void run() {
+ canStart();
+ canEnd();
+ }
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/StartConfigPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/StartConfigPartial.java
new file mode 100644
index 000000000..6c883f1b5
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/StartConfigPartial.java
@@ -0,0 +1,153 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import lombok.Getter;
+import org.onap.policy.common.capabilities.Configurable;
+import org.onap.policy.common.capabilities.Startable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Partial implementation of an object that is both startable and configurable. It
+ * provides the high level methods defined in the interface, while deferring the details
+ * to abstract methods that must be provided by the subclasses. It also manages the
+ * current {@link #state}.
+ *
+ * @param <T> type of parameters expected by {@link #configure(Object)}
+ */
+public abstract class StartConfigPartial<T> implements Startable, Configurable<T> {
+ private static final Logger logger = LoggerFactory.getLogger(StartConfigPartial.class);
+
+ @Getter
+ private final String fullName;
+
+ public enum State {
+ IDLE, CONFIGURED, ALIVE
+ }
+
+ private State state = State.IDLE;
+
+ /**
+ * Constructs the object.
+ *
+ * @param fullName full name of this object, used for logging and exception purposes
+ */
+ public StartConfigPartial(String fullName) {
+ this.fullName = fullName;
+ }
+
+ @Override
+ public synchronized boolean isAlive() {
+ return (state == State.ALIVE);
+ }
+
+ /**
+ * Determines if this object has been configured.
+ *
+ * @return {@code true} if this object has been configured, {@code false} otherwise
+ */
+ public synchronized boolean isConfigured() {
+ return (state != State.IDLE);
+ }
+
+ @Override
+ public synchronized void configure(T parameters) {
+ if (isAlive()) {
+ throw new IllegalStateException("attempt to reconfigure, but already running " + getFullName());
+ }
+
+ logger.info("initializing {}", getFullName());
+
+ doConfigure(parameters);
+
+ state = State.CONFIGURED;
+ }
+
+ @Override
+ public synchronized boolean start() {
+ switch (state) {
+ case ALIVE:
+ logger.info("{} is already running", getFullName());
+ break;
+
+ case CONFIGURED:
+ logger.info("starting {}", getFullName());
+ doStart();
+ state = State.ALIVE;
+ break;
+
+ case IDLE:
+ default:
+ throw new IllegalStateException("attempt to start unconfigured " + getFullName());
+ }
+
+ return true;
+ }
+
+ @Override
+ public synchronized boolean stop() {
+ if (isAlive()) {
+ logger.info("stopping {}", getFullName());
+ state = State.CONFIGURED;
+ doStop();
+
+ } else {
+ logger.info("{} is not running", getFullName());
+ }
+
+ return true;
+ }
+
+ @Override
+ public synchronized void shutdown() {
+ if (!isAlive()) {
+ logger.info("{} is not running", getFullName());
+ return;
+ }
+
+ logger.info("shutting down actor {}", getFullName());
+ state = State.CONFIGURED;
+ doShutdown();
+ }
+
+ /**
+ * Configures this object.
+ *
+ * @param parameters configuration parameters
+ */
+ protected abstract void doConfigure(T parameters);
+
+ /**
+ * Starts this object.
+ */
+ protected abstract void doStart();
+
+ /**
+ * Stops this object.
+ */
+ protected abstract void doStop();
+
+ /**
+ * Shuts down this object.
+ */
+ protected abstract void doShutdown();
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java
new file mode 100644
index 000000000..08aba81f2
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java
@@ -0,0 +1,210 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Consumer;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.onap.policy.common.parameters.BeanValidationResult;
+import org.onap.policy.common.parameters.BeanValidator;
+import org.onap.policy.common.parameters.annotations.NotNull;
+import org.onap.policy.controlloop.ControlLoopOperation;
+import org.onap.policy.controlloop.actorserviceprovider.ActorService;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
+import org.onap.policy.controlloop.policy.Policy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Parameters for control loop operations. The executor defaults to
+ * {@link ForkJoinPool#commonPool()}, but may be overridden.
+ */
+@Getter
+@Builder(toBuilder = true)
+@AllArgsConstructor
+@EqualsAndHashCode
+public class ControlLoopOperationParams {
+
+ private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationParams.class);
+
+ public static final String UNKNOWN = "-unknown-";
+
+
+ /**
+ * The actor service in which to find the actor/operation.
+ */
+ @NotNull
+ private ActorService actorService;
+
+ /**
+ * The event for which the operation applies.
+ */
+ @NotNull
+ private ControlLoopEventContext context;
+
+ /**
+ * The executor to use to run the operation.
+ */
+ @NotNull
+ @Builder.Default
+ private Executor executor = ForkJoinPool.commonPool();
+
+ /**
+ * The policy associated with the operation.
+ */
+ @NotNull
+ private Policy policy;
+
+ /**
+ * The function to invoke when the operation starts. This is optional.
+ * <p/>
+ * Note: this may be invoked multiple times, but with different actor/operations. That
+ * may happen if the current operation requires other operations to be performed first
+ * (e.g., A&AI queries, guard checks).
+ */
+ private Consumer<ControlLoopOperation> startCallback;
+
+ /**
+ * The function to invoke when the operation completes. This is optional.
+ * <p/>
+ * Note: this may be invoked multiple times, but with different actor/operations. That
+ * may happen if the current operation requires other operations to be performed first
+ * (e.g., A&AI queries, guard checks).
+ */
+ private Consumer<ControlLoopOperation> completeCallback;
+
+ /**
+ * Target entity.
+ */
+ @NotNull
+ private String target;
+
+ /**
+ * Starts the specified operation.
+ *
+ * @return a future that will return the result of the operation
+ * @throws IllegalArgumentException if the parameters are invalid
+ */
+ public CompletableFuture<ControlLoopOperation> start() {
+ BeanValidationResult result = validate();
+ if (!result.isValid()) {
+ logger.warn("parameter error in operation {}.{} for {}:\n{}", getActor(), getOperation(), getRequestId(),
+ result.getResult());
+ throw new IllegalArgumentException("invalid parameters");
+ }
+
+ // @formatter:off
+ return actorService
+ .getActor(policy.getActor())
+ .getOperator(policy.getRecipe())
+ .startOperation(this);
+ // @formatter:on
+ }
+
+ /**
+ * Gets the name of the actor from the policy.
+ *
+ * @return the actor name, or {@link #UNKNOWN} if no name is available
+ */
+ public String getActor() {
+ return (policy == null || policy.getActor() == null ? UNKNOWN : policy.getActor());
+ }
+
+ /**
+ * Gets the name of the operation from the policy.
+ *
+ * @return the operation name, or {@link #UNKNOWN} if no name is available
+ */
+ public String getOperation() {
+ return (policy == null || policy.getRecipe() == null ? UNKNOWN : policy.getRecipe());
+ }
+
+ /**
+ * Gets the requested ID of the associated event.
+ *
+ * @return the event's request ID, or {@code null} if no request ID is available
+ */
+ public UUID getRequestId() {
+ return (context == null || context.getEvent() == null ? null : context.getEvent().getRequestId());
+ }
+
+ /**
+ * Makes an operation outcome, populating it from the parameters.
+ *
+ * @return a new operation outcome
+ */
+ public ControlLoopOperation makeOutcome() {
+ ControlLoopOperation operation = new ControlLoopOperation();
+ operation.setActor(getActor());
+ operation.setOperation(getOperation());
+ operation.setTarget(target);
+
+ return operation;
+ }
+
+ /**
+ * Invokes the callback to indicate that the operation has started. Any exceptions
+ * generated by the callback are logged, but not re-thrown.
+ *
+ * @param operation the operation that is being started
+ */
+ public void callbackStarted(ControlLoopOperation operation) {
+ logger.info("started operation {}.{} for {}", operation.getActor(), operation.getOperation(), getRequestId());
+
+ if (startCallback != null) {
+ Util.logException(() -> startCallback.accept(operation), "{}.{}: start-callback threw an exception for {}",
+ operation.getActor(), operation.getOperation(), getRequestId());
+ }
+ }
+
+ /**
+ * Invokes the callback to indicate that the operation has completed. Any exceptions
+ * generated by the callback are logged, but not re-thrown.
+ *
+ * @param operation the operation that is being started
+ */
+ public void callbackCompleted(ControlLoopOperation operation) {
+ logger.info("completed operation {}.{} outcome={} for {}", operation.getActor(), operation.getOperation(),
+ operation.getOutcome(), getRequestId());
+
+ if (completeCallback != null) {
+ Util.logException(() -> completeCallback.accept(operation),
+ "{}.{}: complete-callback threw an exception for {}", operation.getActor(),
+ operation.getOperation(), getRequestId());
+ }
+ }
+
+ /**
+ * Validates the parameters.
+ *
+ * @return the validation result
+ */
+ public BeanValidationResult validate() {
+ return new BeanValidator().validateTop(ControlLoopOperationParams.class.getSimpleName(), this);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java
new file mode 100644
index 000000000..da4fb4f0c
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java
@@ -0,0 +1,112 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import java.util.Map;
+import java.util.function.Function;
+import lombok.Data;
+import org.onap.policy.common.parameters.BeanValidationResult;
+import org.onap.policy.common.parameters.BeanValidator;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.common.parameters.annotations.Min;
+import org.onap.policy.common.parameters.annotations.NotBlank;
+import org.onap.policy.common.parameters.annotations.NotNull;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+
+/**
+ * Parameters used by Actors that connect to a server via HTTP. This contains the
+ * parameters that are common to all of the operations. Only the path changes for each
+ * operation, thus it includes a mapping from operation name to path.
+ */
+@Data
+@NotNull
+@NotBlank
+public class HttpActorParams {
+
+ /**
+ * Name of the HttpClient, as found in the HttpClientFactory.
+ */
+ private String clientName;
+
+ /**
+ * Amount of time, in seconds to wait for the HTTP request to complete, where zero
+ * indicates that it should wait forever. The default is zero.
+ */
+ @Min(0)
+ private long timeoutSec = 0;
+
+ /**
+ * Maps the operation name to its URI path.
+ */
+ private Map<String, String> path;
+
+ /**
+ * Extracts a specific operation's parameters from "this".
+ *
+ * @param name name of the item containing "this"
+ * @return a function to extract an operation's parameters from "this". Note: the
+ * returned function is not thread-safe
+ */
+ public Function<String, Map<String, Object>> makeOperationParameters(String name) {
+ HttpParams subparams = HttpParams.builder().clientName(getClientName()).timeoutSec(getTimeoutSec()).build();
+
+ return operation -> {
+ String subpath = path.get(operation);
+ if (subpath == null) {
+ return null;
+ }
+
+ subparams.setPath(subpath);
+ return Util.translateToMap(name + "." + operation, subparams);
+ };
+ }
+
+ /**
+ * Validates the parameters.
+ *
+ * @param name name of the object containing these parameters
+ * @return "this"
+ * @throws IllegalArgumentException if the parameters are invalid
+ */
+ public HttpActorParams doValidation(String name) {
+ ValidationResult result = validate(name);
+ if (!result.isValid()) {
+ throw new ParameterValidationRuntimeException("invalid parameters", result);
+ }
+
+ return this;
+ }
+
+ /**
+ * Validates the parameters.
+ *
+ * @param resultName name of the result
+ *
+ * @return the validation result
+ */
+ public ValidationResult validate(String resultName) {
+ BeanValidationResult result = new BeanValidator().validateTop(resultName, this);
+
+ result.validateMap("path", path, (result2, entry) -> result2.validateNotNull(entry.getKey(), entry.getValue()));
+
+ return result;
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java
new file mode 100644
index 000000000..695ffe4dd
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java
@@ -0,0 +1,69 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import lombok.Builder;
+import lombok.Data;
+import org.onap.policy.common.parameters.BeanValidator;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.common.parameters.annotations.Min;
+import org.onap.policy.common.parameters.annotations.NotBlank;
+import org.onap.policy.common.parameters.annotations.NotNull;
+
+/**
+ * Parameters used by Operators that connect to a server via HTTP.
+ */
+@NotNull
+@NotBlank
+@Data
+@Builder(toBuilder = true)
+public class HttpParams {
+
+ /**
+ * Name of the HttpClient, as found in the HttpClientFactory.
+ */
+ private String clientName;
+
+ /**
+ * URI path.
+ */
+ private String path;
+
+ /**
+ * Amount of time, in seconds to wait for the HTTP request to complete, where zero
+ * indicates that it should wait forever. The default is zero.
+ */
+ @Min(0)
+ @Builder.Default
+ private long timeoutSec = 0;
+
+
+ /**
+ * Validates the parameters.
+ *
+ * @param resultName name of the result
+ *
+ * @return the validation result
+ */
+ public ValidationResult validate(String resultName) {
+ return new BeanValidator().validateTop(resultName, this);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ParameterValidationRuntimeException.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ParameterValidationRuntimeException.java
new file mode 100644
index 000000000..3004e1932
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ParameterValidationRuntimeException.java
@@ -0,0 +1,57 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import lombok.Getter;
+import org.onap.policy.common.parameters.ValidationResult;
+
+/**
+ * Parameter runtime exception, with an associated validation result. This is used to
+ * throw an exception while passing a validation result up the chain.
+ * <p/>
+ * Note: the validation result is <i>not</i> included in the exception message.
+ */
+public class ParameterValidationRuntimeException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ @Getter
+ private final transient ValidationResult result;
+
+
+ public ParameterValidationRuntimeException(ValidationResult result) {
+ this.result = result;
+ }
+
+ public ParameterValidationRuntimeException(String message, ValidationResult result) {
+ super(message);
+ this.result = result;
+ }
+
+ public ParameterValidationRuntimeException(Throwable cause, ValidationResult result) {
+ super(cause);
+ this.result = result;
+ }
+
+ public ParameterValidationRuntimeException(String message, Throwable cause, ValidationResult result) {
+ super(message, cause);
+ this.result = result;
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java
new file mode 100644
index 000000000..9e6d8a15e
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java
@@ -0,0 +1,68 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import lombok.Builder;
+import lombok.Data;
+import org.onap.policy.common.parameters.BeanValidator;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.common.parameters.annotations.Min;
+import org.onap.policy.common.parameters.annotations.NotBlank;
+import org.onap.policy.common.parameters.annotations.NotNull;
+
+/**
+ * Parameters used by Operators that connect to a server via DMaaP.
+ */
+@NotNull
+@NotBlank
+@Data
+@Builder(toBuilder = true)
+public class TopicParams {
+
+ /**
+ * Name of the target topic end point to which requests should be published.
+ */
+ private String target;
+
+ /**
+ * Source topic end point, from which to read responses.
+ */
+ private String source;
+
+ /**
+ * Amount of time, in seconds to wait for the response, where zero indicates that it
+ * should wait forever. The default is zero.
+ */
+ @Min(0)
+ @Builder.Default
+ private long timeoutSec = 0;
+
+ /**
+ * Validates both the publisher and the subscriber parameters.
+ *
+ * @param resultName name of the result
+ *
+ * @return the validation result
+ */
+ public ValidationResult validate(String resultName) {
+ return new BeanValidator().validateTop(resultName, this);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/FutureManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/FutureManager.java
new file mode 100644
index 000000000..aac2f77b7
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/FutureManager.java
@@ -0,0 +1,89 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.pipeline;
+
+import java.util.IdentityHashMap;
+import java.util.concurrent.Future;
+import lombok.NoArgsConstructor;
+
+/**
+ * Manager that manages both futures and listeners. When {@link #stop()} is called, the
+ * listeners are executed and the futures are canceled. The various methods synchronize on
+ * "this" while they manipulate internal data structures.
+ */
+@NoArgsConstructor
+public class FutureManager extends ListenerManager {
+
+ /**
+ * Maps a future to its listener. Records the {@link Runnable} that is passed to
+ * {@link ListenerManager#add(Runnable)} when {@link #add(Future)} is invoked. This is
+ * needed if {@link #remove(Future)} is invoked, so that the same {@link Runnable} is
+ * used each time.
+ */
+ @SuppressWarnings("rawtypes")
+ private final IdentityHashMap<Future, Runnable> future2listener = new IdentityHashMap<>(5);
+
+ /**
+ * Adds a future that is to be canceled when this controller is stopped. Note: if the
+ * controller is already stopped, then the future will be canceled immediately, within
+ * the invoking thread.
+ *
+ * @param future future to be added
+ */
+ public <T> void add(Future<T> future) {
+ Runnable listener = () -> future.cancel(false);
+
+ synchronized (this) {
+ if (future2listener.putIfAbsent(future, listener) != null) {
+ // this future is already in the map, nothing more to do
+ return;
+ }
+
+ if (addOnly(listener)) {
+ // successfully added
+ return;
+ }
+ }
+
+ runListener(listener);
+ }
+
+ /**
+ * Removes a future so that it is not canceled when this controller is stopped.
+ *
+ * @param future future to be removed
+ */
+ public synchronized <T> void remove(Future<T> future) {
+ Runnable listener = future2listener.remove(future);
+ if (listener != null) {
+ remove(listener);
+ }
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+
+ synchronized (this) {
+ future2listener.clear();
+ }
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java
new file mode 100644
index 000000000..d34a3fb5b
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java
@@ -0,0 +1,115 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.pipeline;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+
+/**
+ * Listener manager, used by operations within the pipeline to determine if they should
+ * continue to run. When {@link #stop()} is called, the listeners are executed. The
+ * various methods synchronize on "this" while they manipulate internal data structures.
+ */
+@NoArgsConstructor
+public class ListenerManager {
+
+ @Getter
+ private volatile boolean running = true;
+
+ /**
+ * Listeners to be executed when {@link #stop()} is invoked.
+ */
+ private final IdentityHashMap<Runnable, Void> listeners = new IdentityHashMap<>(5);
+
+ /**
+ * Indicates that operations within the pipeline should stop executing.
+ */
+ public void stop() {
+ ArrayList<Runnable> items;
+
+ synchronized (this) {
+ if (!running) {
+ return;
+ }
+
+ running = false;
+ items = new ArrayList<>(listeners.keySet());
+ listeners.clear();
+ }
+
+ items.forEach(this::runListener);
+ }
+
+ /**
+ * Adds a listener that is to be invoked when this controller is stopped. Note: if the
+ * controller is already stopped, then the listener will be invoked immediately,
+ * within the invoking thread.
+ *
+ * @param listener listener to be added
+ */
+ public void add(Runnable listener) {
+ if (!addOnly(listener)) {
+ runListener(listener);
+ }
+ }
+
+ /**
+ * Adds a listener that is to be invoked when this controller is stopped. Note: if the
+ * controller is already stopped, then the listener will be invoked immediately,
+ * within the invoking thread.
+ *
+ * @param listener listener to be added
+ * @return {@code true} if the the listener was added, {@code false} if it could not
+ * be added because this manager has already been stopped
+ */
+ protected boolean addOnly(Runnable listener) {
+ synchronized (this) {
+ if (running) {
+ listeners.put(listener, null);
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Runs a listener, catching any exceptions that it may throw.
+ *
+ * @param listener listener to be executed
+ */
+ protected void runListener(Runnable listener) {
+ // TODO do this asynchronously?
+ Util.logException(listener, "pipeline listener {} threw an exception", listener);
+ }
+
+ /**
+ * Removes a listener so that it is not invoked when this controller is stopped.
+ *
+ * @param listener listener to be removed
+ */
+ public synchronized void remove(Runnable listener) {
+ listeners.remove(listener);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java
new file mode 100644
index 000000000..96c8f9e05
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java
@@ -0,0 +1,157 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.pipeline;
+
+import static org.onap.policy.controlloop.actorserviceprovider.Util.ident;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import lombok.NoArgsConstructor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Pipeline controller, used by operations within the pipeline to determine if they should
+ * continue to run. If {@link #cancel(boolean)} is invoked, it automatically stops the
+ * pipeline.
+ */
+@NoArgsConstructor
+public class PipelineControllerFuture<T> extends CompletableFuture<T> {
+
+ private static final Logger logger = LoggerFactory.getLogger(PipelineControllerFuture.class);
+
+ /**
+ * Tracks items added to this controller via one of the <i>add</i> methods.
+ */
+ private final FutureManager futures = new FutureManager();
+
+
+ /**
+ * Cancels and stops the pipeline, in that order.
+ */
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ try {
+ logger.trace("{}: cancel future", ident(this));
+ return super.cancel(mayInterruptIfRunning);
+
+ } finally {
+ futures.stop();
+ }
+ }
+
+ /**
+ * Generates a function that, when invoked, will remove the given future. This is
+ * typically added onto the end of a pipeline via one of the
+ * {@link CompletableFuture#whenComplete(BiConsumer)} methods.
+ *
+ * @return a function that removes the given future
+ */
+ public <F> BiConsumer<T, Throwable> delayedRemove(Future<F> future) {
+ return (value, thrown) -> {
+ logger.trace("{}: remove future {}", ident(this), ident(future));
+ remove(future);
+ };
+ }
+
+ /**
+ * Generates a function that, when invoked, will remove the given listener. This is
+ * typically added onto the end of a pipeline via one of the
+ * {@link CompletableFuture#whenComplete(BiConsumer)} methods.
+ *
+ * @return a function that removes the given listener
+ */
+ public BiConsumer<T, Throwable> delayedRemove(Runnable listener) {
+ return (value, thrown) -> {
+ logger.trace("{}: remove listener {}", ident(this), ident(listener));
+ remove(listener);
+ };
+ }
+
+ /**
+ * Generates a function that, when invoked, will stop all pipeline listeners and
+ * complete this future. This is typically added onto the end of a pipeline via one of
+ * the {@link CompletableFuture#whenComplete(BiConsumer)} methods.
+ *
+ * @return a function that stops all pipeline listeners
+ */
+ public BiConsumer<T, Throwable> delayedComplete() {
+ return (value, thrown) -> {
+ if (thrown == null) {
+ logger.trace("{}: complete and stop future", ident(this));
+ complete(value);
+ } else {
+ logger.trace("{}: complete exceptionally and stop future", ident(this));
+ completeExceptionally(thrown);
+ }
+
+ futures.stop();
+ };
+ }
+
+ /**
+ * Adds a function whose return value is to be canceled when this controller is
+ * stopped. Note: if the controller is already stopped, then the function will
+ * <i>not</i> be executed.
+ *
+ * @param futureMaker function to be invoked in the future
+ */
+ public <F> Function<F, CompletableFuture<F>> add(Function<F, CompletableFuture<F>> futureMaker) {
+
+ return input -> {
+ if (!isRunning()) {
+ logger.trace("{}: discarded new future", ident(this));
+ return new CompletableFuture<>();
+ }
+
+ CompletableFuture<F> future = futureMaker.apply(input);
+ add(future);
+
+ return future;
+ };
+ }
+
+ public <F> void add(Future<F> future) {
+ logger.trace("{}: add future {}", ident(this), ident(future));
+ futures.add(future);
+ }
+
+ public void add(Runnable listener) {
+ logger.trace("{}: add listener {}", ident(this), ident(listener));
+ futures.add(listener);
+ }
+
+ public boolean isRunning() {
+ return futures.isRunning();
+ }
+
+ public <F> void remove(Future<F> future) {
+ logger.trace("{}: remove future {}", ident(this), ident(future));
+ futures.remove(future);
+ }
+
+ public void remove(Runnable listener) {
+ logger.trace("{}: remove listener {}", ident(this), ident(listener));
+ futures.remove(listener);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java
index 88f3c16eb..620950a3c 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* Actor
* ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018, 2020 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2019 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,9 +21,56 @@
package org.onap.policy.controlloop.actorserviceprovider.spi;
+import java.util.Collection;
+
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.onap.policy.common.capabilities.Configurable;
+import org.onap.policy.common.capabilities.Startable;
+import org.onap.policy.controlloop.actorserviceprovider.Operator;
+
+/**
+ * This is the service interface for defining an Actor used in Control Loop Operational
+ * Policies for performing actions on runtime entities.
+ *
+ * @author pameladragosh
+ *
+ */
+public interface Actor extends Startable, Configurable<Map<String,Object>> {
+
+ /**
+ * Gets the name of the actor.
+ *
+ * @return the actor name
+ */
+ String getName();
+
+ /**
+ * Gets a particular operator.
+ *
+ * @param name name of the operation of interest
+ * @return the desired operation
+ * @throws IllegalArgumentException if no operation by the given name exists
+ */
+ Operator getOperator(String name);
+
+ /**
+ * Gets the supported operations.
+ *
+ * @return the supported operations
+ */
+ public Collection<Operator> getOperators();
+
+ /**
+ * Gets the names of the supported operations.
+ *
+ * @return the names of the supported operations
+ */
+ public Set<String> getOperationNames();
+
-public interface Actor {
+ // TODO old code: remove lines down to **HERE**
String actor();
@@ -33,4 +80,5 @@ public interface Actor {
List<String> recipePayloads(String recipe);
+ // **HERE**
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/ActorServiceProviderTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/ActorServiceProviderTest.java
index 7ab21dece..139c5179b 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/ActorServiceProviderTest.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/ActorServiceProviderTest.java
@@ -4,7 +4,7 @@
* ================================================================================
* Copyright (C) 2018 Ericsson. All rights reserved.
* Modifications Copyright (C) 2019 Nordix Foundation.
- * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,6 +28,8 @@ import static org.junit.Assert.assertNotNull;
import org.junit.Test;
import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
+// TODO combine this with ActorServiceTest
+
public class ActorServiceProviderTest {
private static final String DOROTHY = "Dorothy";
@@ -37,12 +39,12 @@ public class ActorServiceProviderTest {
ActorService actorService = ActorService.getInstance();
assertNotNull(actorService);
- assertEquals(1, actorService.actors().size());
+ assertEquals(1, actorService.getActors().size());
actorService = ActorService.getInstance();
assertNotNull(actorService);
- Actor dummyActor = ActorService.getInstance().actors().get(0);
+ Actor dummyActor = ActorService.getInstance().getActors().iterator().next();
assertNotNull(dummyActor);
assertEquals("DummyActor", dummyActor.actor());
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/ActorServiceTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/ActorServiceTest.java
new file mode 100644
index 000000000..851a79129
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/ActorServiceTest.java
@@ -0,0 +1,382 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.parameters.ObjectValidationResult;
+import org.onap.policy.common.parameters.ValidationStatus;
+import org.onap.policy.controlloop.actorserviceprovider.impl.ActorImpl;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
+import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
+
+public class ActorServiceTest {
+ private static final String EXPECTED_EXCEPTION = "expected exception";
+ private static final String ACTOR1 = "actor A";
+ private static final String ACTOR2 = "actor B";
+ private static final String ACTOR3 = "actor C";
+ private static final String ACTOR4 = "actor D";
+
+ private Actor actor1;
+ private Actor actor2;
+ private Actor actor3;
+ private Actor actor4;
+
+ private Map<String, Object> sub1;
+ private Map<String, Object> sub2;
+ private Map<String, Object> sub3;
+ private Map<String, Object> sub4;
+ private Map<String, Object> params;
+
+ private ActorService service;
+
+
+ /**
+ * Initializes the fields, including a fully populated {@link #service}.
+ */
+ @Before
+ public void setUp() {
+ actor1 = spy(new ActorImpl(ACTOR1));
+ actor2 = spy(new ActorImpl(ACTOR2));
+ actor3 = spy(new ActorImpl(ACTOR3));
+ actor4 = spy(new ActorImpl(ACTOR4));
+
+ sub1 = Map.of("sub A", "value A");
+ sub2 = Map.of("sub B", "value B");
+ sub3 = Map.of("sub C", "value C");
+ sub4 = Map.of("sub D", "value D");
+
+ params = Map.of(ACTOR1, sub1, ACTOR2, sub2, ACTOR3, sub3, ACTOR4, sub4);
+
+ service = makeService(actor1, actor2, actor3, actor4);
+ }
+
+ @Test
+ public void testActorService() {
+ /*
+ * make a service where actors two and four have names that are duplicates of the
+ * others
+ */
+ actor2 = spy(new ActorImpl(ACTOR1));
+ actor4 = spy(new ActorImpl(ACTOR3));
+
+ service = makeService(actor1, actor2, actor3, actor4);
+
+ assertEquals(2, service.getActorNames().size());
+
+ assertSame(actor1, service.getActor(ACTOR1));
+ assertSame(actor3, service.getActor(ACTOR3));
+ }
+
+ @Test
+ public void testDoStart() {
+ service.configure(params);
+
+ setUpOp("testDoStart", actor -> when(actor.isConfigured()).thenReturn(false), Actor::start);
+
+ /*
+ * Start the service.
+ */
+ service.start();
+ assertTrue(service.isAlive());
+
+ Iterator<Actor> iter = service.getActors().iterator();
+ verify(iter.next()).start();
+ verify(iter.next(), never()).start();
+ verify(iter.next()).start();
+ verify(iter.next()).start();
+
+ // no additional types of operations
+ verify(actor1).configure(any());
+
+ // no other types of operations
+ verify(actor1, never()).stop();
+ verify(actor1, never()).shutdown();
+ }
+
+ @Test
+ public void testDoStop() {
+ service.configure(params);
+ service.start();
+
+ setUpOp("testDoStop", Actor::stop, Actor::stop);
+
+ /*
+ * Stop the service.
+ */
+ service.stop();
+ assertFalse(service.isAlive());
+
+ Iterator<Actor> iter = service.getActors().iterator();
+ verify(iter.next()).stop();
+ verify(iter.next(), times(2)).stop();
+ verify(iter.next()).stop();
+ verify(iter.next()).stop();
+
+ // no additional types of operations
+ verify(actor1).configure(any());
+ verify(actor1).start();
+
+ // no other types of operation
+ verify(actor1, never()).shutdown();
+ }
+
+ @Test
+ public void testDoShutdown() {
+ service.configure(params);
+ service.start();
+
+ setUpOp("testDoShutdown", Actor::shutdown, Actor::shutdown);
+
+ /*
+ * Shut down the service.
+ */
+ service.shutdown();
+ assertFalse(service.isAlive());
+
+ Iterator<Actor> iter = service.getActors().iterator();
+ verify(iter.next()).shutdown();
+ verify(iter.next(), times(2)).shutdown();
+ verify(iter.next()).shutdown();
+ verify(iter.next()).shutdown();
+
+ // no additional types of operations
+ verify(actor1).configure(any());
+ verify(actor1).start();
+
+ // no other types of operation
+ verify(actor1, never()).stop();
+ }
+
+ /**
+ * Applies an operation to the second actor, and then arranges for the third actor to
+ * throw an exception when its operation is performed.
+ *
+ * @param testName test name
+ * @param oper2 operation to apply to the second actor
+ * @param oper3 operation to apply to the third actor
+ */
+ private void setUpOp(String testName, Consumer<Actor> oper2, Consumer<Actor> oper3) {
+ Collection<Actor> actors = service.getActors();
+ assertEquals(testName, 4, actors.size());
+
+ Iterator<Actor> iter = actors.iterator();
+
+ // leave the first alone
+ iter.next();
+
+ // apply oper2 to the second actor
+ oper2.accept(iter.next());
+
+ // throw an exception in the third
+ oper3.accept(doThrow(new IllegalStateException(EXPECTED_EXCEPTION)).when(iter.next()));
+
+ // leave the fourth alone
+ iter.next();
+ }
+
+ @Test
+ public void testGetInstance() {
+ service = ActorService.getInstance();
+ assertNotNull(service);
+
+ assertSame(service, ActorService.getInstance());
+ }
+
+ @Test
+ public void testGetActor() {
+ assertSame(actor1, service.getActor(ACTOR1));
+ assertSame(actor3, service.getActor(ACTOR3));
+
+ assertThatIllegalArgumentException().isThrownBy(() -> service.getActor("unknown actor"));
+ }
+
+ @Test
+ public void testGetActors() {
+ // @formatter:off
+ assertEquals("[actor A, actor B, actor C, actor D]",
+ service.getActors().stream()
+ .map(Actor::getName)
+ .sorted()
+ .collect(Collectors.toList())
+ .toString());
+ // @formatter:on
+ }
+
+ @Test
+ public void testGetActorNames() {
+ // @formatter:off
+ assertEquals("[actor A, actor B, actor C, actor D]",
+ service.getActorNames().stream()
+ .sorted()
+ .collect(Collectors.toList())
+ .toString());
+ // @formatter:on
+ }
+
+ @Test
+ public void testDoConfigure() {
+ service.configure(params);
+ assertTrue(service.isConfigured());
+
+ verify(actor1).configure(sub1);
+ verify(actor2).configure(sub2);
+ verify(actor3).configure(sub3);
+ verify(actor4).configure(sub4);
+
+ // no other types of operations
+ verify(actor1, never()).start();
+ verify(actor1, never()).stop();
+ verify(actor1, never()).shutdown();
+ }
+
+ /**
+ * Tests doConfigure() where actors throw parameter validation and runtime exceptions.
+ */
+ @Test
+ public void testDoConfigureExceptions() {
+ makeValidException(actor1);
+ makeRuntimeException(actor2);
+ makeValidException(actor3);
+
+ service.configure(params);
+ assertTrue(service.isConfigured());
+ }
+
+ /**
+ * Tests doConfigure(). Arranges for the following:
+ * <ul>
+ * <li>one actor is configured, but has parameters</li>
+ * <li>another actor is configured, but has no parameters</li>
+ * <li>another actor has no parameters and is not configured</li>
+ * </ul>
+ */
+ @Test
+ public void testDoConfigureConfigure() {
+ // need mutable parameters
+ params = new TreeMap<>(params);
+
+ // configure one actor
+ actor1.configure(sub1);
+
+ // configure another and remove its parameters
+ actor2.configure(sub2);
+ params.remove(ACTOR2);
+
+ // create a new, unconfigured actor
+ ActorImpl actor5 = spy(new ActorImpl("UNCONFIGURED"));
+ service = makeService(actor1, actor2, actor3, actor4, actor5);
+
+ /*
+ * Configure it.
+ */
+ service.configure(params);
+ assertTrue(service.isConfigured());
+
+ // this should have been configured again
+ verify(actor1, times(2)).configure(sub1);
+
+ // no parameters, so this should not have been configured again
+ verify(actor2).configure(sub2);
+
+ // these were only configured once
+ verify(actor3).configure(sub3);
+ verify(actor4).configure(sub4);
+
+ // never configured
+ verify(actor5, never()).configure(any());
+ assertFalse(actor5.isConfigured());
+
+ // start and verify that all are started except for the last
+ service.start();
+ verify(actor1).start();
+ verify(actor2).start();
+ verify(actor3).start();
+ verify(actor4).start();
+ verify(actor5, never()).start();
+ }
+
+ /**
+ * Arranges for an actor to throw a validation exception when
+ * {@link Actor#configure(Map)} is invoked.
+ *
+ * @param actor actor of interest
+ */
+ private void makeValidException(Actor actor) {
+ ParameterValidationRuntimeException ex = new ParameterValidationRuntimeException(
+ new ObjectValidationResult(actor.getName(), null, ValidationStatus.INVALID, "null"));
+ doThrow(ex).when(actor).configure(any());
+ }
+
+ /**
+ * Arranges for an actor to throw a runtime exception when
+ * {@link Actor#configure(Map)} is invoked.
+ *
+ * @param actor actor of interest
+ */
+ private void makeRuntimeException(Actor actor) {
+ IllegalStateException ex = new IllegalStateException(EXPECTED_EXCEPTION);
+ doThrow(ex).when(actor).configure(any());
+ }
+
+ @Test
+ public void testLoadActors() {
+ assertFalse(ActorService.getInstance().getActors().isEmpty());
+ assertNotNull(ActorService.getInstance().getActor("DummyActor"));
+ }
+
+ /**
+ * Makes an actor service whose {@link ActorService#loadActors()} method returns the
+ * given actors.
+ *
+ * @param actors actors to be returned
+ * @return a new actor service
+ */
+ private ActorService makeService(Actor... actors) {
+ return new ActorService() {
+ @Override
+ protected Iterable<Actor> loadActors() {
+ return Arrays.asList(actors);
+ }
+ };
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/DelayedIdentStringTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/DelayedIdentStringTest.java
new file mode 100644
index 000000000..5b9856f41
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/DelayedIdentStringTest.java
@@ -0,0 +1,93 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class DelayedIdentStringTest {
+
+ private int countToStringCalls;
+ private Object object;
+ private DelayedIdentString delay;
+
+ /**
+ * Initializes fields, including {@link #delay}.
+ */
+ @Before
+ public void setUp() {
+ countToStringCalls = 0;
+
+ object = new Object() {
+ @Override
+ public String toString() {
+ ++countToStringCalls;
+ return super.toString();
+ }
+ };
+
+ delay = new DelayedIdentString(object);
+ }
+
+ @Test
+ public void testToString() {
+ String delayed = delay.toString();
+ assertEquals(1, countToStringCalls);
+
+ String real = object.toString();
+ assertNotEquals(real, delayed);
+
+ assertThat(delayed).startsWith("@");
+ assertTrue(delayed.length() > 1);
+
+ // test case where the object is null
+ assertEquals(DelayedIdentString.NULL_STRING, new DelayedIdentString(null).toString());
+
+ // test case where the object returns null from toString()
+ object = new Object() {
+ @Override
+ public String toString() {
+ return null;
+ }
+ };
+ assertEquals(DelayedIdentString.NULL_STRING, new DelayedIdentString(object).toString());
+
+ // test case where the object's toString() does not include "@"
+ object = new Object() {
+ @Override
+ public String toString() {
+ return "some text";
+ }
+ };
+ assertEquals(object.toString(), new DelayedIdentString(object).toString());
+ }
+
+ @Test
+ public void testDelayedIdentString() {
+ // should not have called the object's toString() method yet
+ assertEquals(0, countToStringCalls);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/DummyActor.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/DummyActor.java
index e9cf238e2..76cadffa6 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/DummyActor.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/DummyActor.java
@@ -4,6 +4,7 @@
* ================================================================================
* Copyright (C) 2018 Ericsson. All rights reserved.
* Modifications Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,10 +24,14 @@ package org.onap.policy.controlloop.actorserviceprovider;
import java.util.ArrayList;
import java.util.List;
+import org.onap.policy.controlloop.actorserviceprovider.impl.ActorImpl;
-import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
+public class DummyActor extends ActorImpl {
+
+ public DummyActor() {
+ super(DummyActor.class.getSimpleName());
+ }
-public class DummyActor implements Actor {
@Override
public String actor() {
return this.getClass().getSimpleName();
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/UtilTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/UtilTest.java
new file mode 100644
index 000000000..c652e8374
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/UtilTest.java
@@ -0,0 +1,126 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Builder;
+import lombok.Data;
+import org.junit.Test;
+
+public class UtilTest {
+
+ @Test
+ public void testIdent() {
+ Object object = new Object();
+ String result = Util.ident(object).toString();
+
+ assertNotEquals(object.toString(), result);
+ assertThat(result).startsWith("@");
+ assertTrue(result.length() > 1);
+ }
+
+ @Test
+ public void testLogException() {
+ // no exception, no log
+ AtomicInteger count = new AtomicInteger();
+ Util.logException(() -> count.incrementAndGet(), "no error");
+ assertEquals(1, count.get());
+
+ // with an exception
+ Runnable runnable = () -> {
+ count.incrementAndGet();
+ throw new IllegalStateException("expected exception");
+ };
+
+ Util.logException(runnable, "error with no args");
+ Util.logException(runnable, "error {} {} arg(s)", "with", 1);
+ }
+
+ @Test
+ public void testTranslate() {
+ // Abc => Abc
+ final Abc abc = Abc.builder().intValue(1).strValue("hello").anotherString("another").build();
+ Abc abc2 = Util.translate("abc to abc", abc, Abc.class);
+ assertEquals(abc, abc2);
+
+ // Abc => Similar
+ Similar sim = Util.translate("abc to similar", abc, Similar.class);
+ assertEquals(abc.getIntValue(), sim.getIntValue());
+ assertEquals(abc.getStrValue(), sim.getStrValue());
+
+ // Abc => Map
+ @SuppressWarnings("unchecked")
+ Map<String, Object> map = Util.translate("abc to map", abc, TreeMap.class);
+ assertEquals("{anotherString=another, intValue=1, strValue=hello}", map.toString());
+
+ // Map => Map
+ @SuppressWarnings("unchecked")
+ Map<String, Object> map2 = Util.translate("map to map", map, LinkedHashMap.class);
+ assertEquals(map.toString(), map2.toString());
+
+ // Map => Abc
+ abc2 = Util.translate("map to abc", map, Abc.class);
+ assertEquals(abc, abc2);
+ }
+
+ @Test
+ public void testTranslateToMap() {
+ assertNull(Util.translateToMap("map: null", null));
+
+ // Abc => Map
+ final Abc abc = Abc.builder().intValue(2).strValue("world").anotherString("some").build();
+ Map<String, Object> map = new TreeMap<>(Util.translateToMap("map: abc to map", abc));
+ assertEquals("{anotherString=some, intValue=2, strValue=world}", map.toString());
+
+ // Map => Map
+ Map<String, Object> map2 = Util.translateToMap("map: map to map", map);
+ assertEquals(map.toString(), map2.toString());
+
+ assertThatIllegalArgumentException().isThrownBy(() -> Util.translateToMap("map: string", "some string"))
+ .withMessageContaining("map: string");
+ }
+
+ @Data
+ @Builder
+ public static class Abc {
+ private int intValue;
+ private String strValue;
+ private String anotherString;
+ }
+
+ // this shares some fields with Abc so the data should transfer
+ @Data
+ @Builder
+ public static class Similar {
+ private int intValue;
+ private String strValue;
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java
new file mode 100644
index 000000000..fcc3fb12e
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java
@@ -0,0 +1,60 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.controlloop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.controlloop.VirtualControlLoopEvent;
+
+public class ControlLoopEventContextTest {
+
+ private VirtualControlLoopEvent event;
+ private ControlLoopEventContext context;
+
+ @Before
+ public void setUp() {
+ event = new VirtualControlLoopEvent();
+ context = new ControlLoopEventContext(event);
+ }
+
+ @Test
+ public void testControlLoopEventContext() {
+ assertSame(event, context.getEvent());
+ }
+
+ @Test
+ public void testContains_testGetProperty_testSetProperty() {
+ context.setProperty("abc", "a string");
+ context.setProperty("def", 100);
+
+ assertFalse(context.contains("ghi"));
+
+ String strValue = context.getProperty("abc");
+ assertEquals("a string", strValue);
+
+ int intValue = context.getProperty("def");
+ assertEquals(100, intValue);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImplTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImplTest.java
new file mode 100644
index 000000000..7e0c35a3f
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImplTest.java
@@ -0,0 +1,379 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.parameters.ObjectValidationResult;
+import org.onap.policy.common.parameters.ValidationStatus;
+import org.onap.policy.controlloop.actorserviceprovider.Operator;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
+
+public class ActorImplTest {
+ private static final String EXPECTED_EXCEPTION = "expected exception";
+ private static final String ACTOR_NAME = "my-actor";
+ private static final String OPER1 = "add";
+ private static final String OPER2 = "subtract";
+ private static final String OPER3 = "multiply";
+ private static final String OPER4 = "divide";
+
+ private MyOper oper1;
+ private MyOper oper2;
+ private MyOper oper3;
+ private MyOper oper4;
+
+ private Map<String, Object> sub1;
+ private Map<String, Object> sub2;
+ private Map<String, Object> sub3;
+ private Map<String, Object> sub4;
+ private Map<String, Object> params;
+
+ private ActorImpl actor;
+
+
+ /**
+ * Initializes the fields, including a fully populated {@link #actor}.
+ */
+ @Before
+ public void setUp() {
+ oper1 = spy(new MyOper(OPER1));
+ oper2 = spy(new MyOper(OPER2));
+ oper3 = spy(new MyOper(OPER3));
+ oper4 = spy(new MyOper(OPER4));
+
+ sub1 = Map.of("sub A", "value A");
+ sub2 = Map.of("sub B", "value B");
+ sub3 = Map.of("sub C", "value C");
+ sub4 = Map.of("sub D", "value D");
+
+ params = Map.of(OPER1, sub1, OPER2, sub2, OPER3, sub3, OPER4, sub4);
+
+ actor = makeActor(oper1, oper2, oper3, oper4);
+ }
+
+ @Test
+ public void testActorImpl_testGetName() {
+ assertEquals(ACTOR_NAME, actor.getName());
+ assertEquals(4, actor.getOperationNames().size());
+ }
+
+ @Test
+ public void testDoStart() {
+ actor.configure(params);
+ assertEquals(4, actor.getOperationNames().size());
+
+ /*
+ * arrange for second operator to be unconfigured and the third operator to throw
+ * an exception
+ */
+ Iterator<Operator> iter = actor.getOperators().iterator();
+ iter.next();
+ when(iter.next().isConfigured()).thenReturn(false);
+ when(iter.next().start()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
+
+ /*
+ * Start the actor.
+ */
+ actor.start();
+ assertTrue(actor.isAlive());
+
+ iter = actor.getOperators().iterator();
+ verify(iter.next()).start();
+ // this one isn't configured, so shouldn't attempt to start it
+ verify(iter.next(), never()).start();
+ // this one threw an exception
+ iter.next();
+ verify(iter.next()).start();
+
+ // no other types of operations
+ verify(oper1, never()).stop();
+ verify(oper1, never()).shutdown();
+ }
+
+ @Test
+ public void testDoStop() {
+ actor.configure(params);
+ actor.start();
+ assertEquals(4, actor.getOperationNames().size());
+
+ // arrange for second operator to throw an exception
+ Iterator<Operator> iter = actor.getOperators().iterator();
+ iter.next();
+ when(iter.next().stop()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
+
+ /*
+ * Stop the actor.
+ */
+ actor.stop();
+ assertFalse(actor.isAlive());
+
+ iter = actor.getOperators().iterator();
+ verify(iter.next()).stop();
+ // this one threw an exception
+ iter.next();
+ verify(iter.next()).stop();
+ verify(iter.next()).stop();
+
+ // no additional types of operations
+ verify(oper1).configure(any());
+ verify(oper1).start();
+
+ // no other types of operation
+ verify(oper1, never()).shutdown();
+ }
+
+ @Test
+ public void testDoShutdown() {
+ actor.configure(params);
+ actor.start();
+ assertEquals(4, actor.getOperationNames().size());
+
+ // arrange for second operator to throw an exception
+ Iterator<Operator> iter = actor.getOperators().iterator();
+ iter.next();
+ doThrow(new IllegalStateException(EXPECTED_EXCEPTION)).when(iter.next()).shutdown();
+
+ /*
+ * Stop the actor.
+ */
+ actor.shutdown();
+ assertFalse(actor.isAlive());
+
+ iter = actor.getOperators().iterator();
+ verify(iter.next()).shutdown();
+ // this one threw an exception
+ iter.next();
+ verify(iter.next()).shutdown();
+ verify(iter.next()).shutdown();
+
+ // no additional types of operations
+ verify(oper1).configure(any());
+ verify(oper1).start();
+
+ // no other types of operation
+ verify(oper1, never()).stop();
+ }
+
+ @Test
+ public void testSetOperators() {
+ // cannot set operators if already configured
+ actor.configure(params);
+ assertThatIllegalStateException().isThrownBy(() -> actor.setOperators(Collections.emptyList()));
+
+ /*
+ * make an actor where operators two and four have names that are duplicates of
+ * the others
+ */
+ oper2 = spy(new MyOper(OPER1));
+ oper4 = spy(new MyOper(OPER3));
+
+ actor = makeActor(oper1, oper2, oper3, oper4);
+
+ assertEquals(2, actor.getOperationNames().size());
+
+ assertSame(oper1, actor.getOperator(OPER1));
+ assertSame(oper3, actor.getOperator(OPER3));
+ }
+
+ @Test
+ public void testGetOperator() {
+ assertSame(oper1, actor.getOperator(OPER1));
+ assertSame(oper3, actor.getOperator(OPER3));
+
+ assertThatIllegalArgumentException().isThrownBy(() -> actor.getOperator("unknown name"));
+ }
+
+ @Test
+ public void testGetOperators() {
+ // @formatter:off
+ assertEquals("[add, divide, multiply, subtract]",
+ actor.getOperators().stream()
+ .map(Operator::getName)
+ .sorted()
+ .collect(Collectors.toList())
+ .toString());
+ // @formatter:on
+ }
+
+ @Test
+ public void testGetOperationNames() {
+ // @formatter:off
+ assertEquals("[add, divide, multiply, subtract]",
+ actor.getOperationNames().stream()
+ .sorted()
+ .collect(Collectors.toList())
+ .toString());
+ // @formatter:on
+ }
+
+ @Test
+ public void testDoConfigure() {
+ actor.configure(params);
+ assertTrue(actor.isConfigured());
+
+ verify(oper1).configure(sub1);
+ verify(oper2).configure(sub2);
+ verify(oper3).configure(sub3);
+ verify(oper4).configure(sub4);
+
+ // no other types of operations
+ verify(oper1, never()).start();
+ verify(oper1, never()).stop();
+ verify(oper1, never()).shutdown();
+ }
+
+ /**
+ * Tests doConfigure() where operators throw parameter validation and runtime
+ * exceptions.
+ */
+ @Test
+ public void testDoConfigureExceptions() {
+ makeValidException(oper1);
+ makeRuntimeException(oper2);
+ makeValidException(oper3);
+
+ actor.configure(params);
+ assertTrue(actor.isConfigured());
+ }
+
+ /**
+ * Tests doConfigure(). Arranges for the following:
+ * <ul>
+ * <li>one operator is configured, but has parameters</li>
+ * <li>another operator is configured, but has no parameters</li>
+ * <li>another operator has no parameters and is not configured</li>
+ * </ul>
+ */
+ @Test
+ public void testDoConfigureConfigure() {
+ // need mutable parameters
+ params = new TreeMap<>(params);
+
+ // configure one operator
+ oper1.configure(sub1);
+
+ // configure another and remove its parameters
+ oper2.configure(sub2);
+ params.remove(OPER2);
+
+ // create a new, unconfigured actor
+ Operator oper5 = spy(new MyOper("UNCONFIGURED"));
+ actor = makeActor(oper1, oper2, oper3, oper4, oper5);
+
+ /*
+ * Configure it.
+ */
+ actor.configure(params);
+ assertTrue(actor.isConfigured());
+
+ // this should have been configured again
+ verify(oper1, times(2)).configure(sub1);
+
+ // no parameters, so this should not have been configured again
+ verify(oper2).configure(sub2);
+
+ // these were only configured once
+ verify(oper3).configure(sub3);
+ verify(oper4).configure(sub4);
+
+ // never configured
+ verify(oper5, never()).configure(any());
+ assertFalse(oper5.isConfigured());
+
+ // start and verify that all are started except for the last
+ actor.start();
+ verify(oper1).start();
+ verify(oper2).start();
+ verify(oper3).start();
+ verify(oper4).start();
+ verify(oper5, never()).start();
+ }
+
+ /**
+ * Arranges for an operator to throw a validation exception when
+ * {@link Operator#configure(Map)} is invoked.
+ *
+ * @param oper operator of interest
+ */
+ private void makeValidException(Operator oper) {
+ ParameterValidationRuntimeException ex = new ParameterValidationRuntimeException(
+ new ObjectValidationResult(actor.getName(), null, ValidationStatus.INVALID, "null"));
+ doThrow(ex).when(oper).configure(any());
+ }
+
+ /**
+ * Arranges for an operator to throw a runtime exception when
+ * {@link Operator#configure(Map)} is invoked.
+ *
+ * @param oper operator of interest
+ */
+ private void makeRuntimeException(Operator oper) {
+ IllegalStateException ex = new IllegalStateException(EXPECTED_EXCEPTION);
+ doThrow(ex).when(oper).configure(any());
+ }
+
+ @Test
+ public void testMakeOperatorParameters() {
+ actor.configure(params);
+
+ // each operator should have received its own parameters
+ verify(oper1).configure(sub1);
+ verify(oper2).configure(sub2);
+ verify(oper3).configure(sub3);
+ verify(oper4).configure(sub4);
+ }
+
+ /**
+ * Makes an actor with the given operators.
+ *
+ * @param operators associated operators
+ * @return a new actor
+ */
+ private ActorImpl makeActor(Operator... operators) {
+ return new ActorImpl(ACTOR_NAME, operators);
+ }
+
+ private static class MyOper extends OperatorPartial implements Operator {
+
+ public MyOper(String name) {
+ super(ACTOR_NAME, name);
+ }
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartialTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartialTest.java
new file mode 100644
index 000000000..864ac829a
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartialTest.java
@@ -0,0 +1,978 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import lombok.Setter;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.controlloop.ControlLoopOperation;
+import org.onap.policy.controlloop.VirtualControlLoopEvent;
+import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.policy.Policy;
+import org.onap.policy.controlloop.policy.PolicyResult;
+
+public class OperatorPartialTest {
+ private static final int MAX_PARALLEL_REQUESTS = 10;
+ private static final String EXPECTED_EXCEPTION = "expected exception";
+ private static final String ACTOR = "my-actor";
+ private static final String OPERATOR = "my-operator";
+ private static final String TARGET = "my-target";
+ private static final int TIMEOUT = 1000;
+ private static final UUID REQ_ID = UUID.randomUUID();
+
+ private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
+ .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
+
+ private static final List<String> FAILURE_STRINGS =
+ FAILURE_RESULTS.stream().map(Object::toString).collect(Collectors.toList());
+
+ private VirtualControlLoopEvent event;
+ private Map<String, Object> config;
+ private ControlLoopEventContext context;
+ private MyExec executor;
+ private Policy policy;
+ private ControlLoopOperationParams params;
+
+ private MyOper oper;
+
+ private int numStart;
+ private int numEnd;
+
+ private Instant tstart;
+
+ private ControlLoopOperation opstart;
+ private ControlLoopOperation opend;
+
+ /**
+ * Initializes the fields, including {@link #oper}.
+ */
+ @Before
+ public void setUp() {
+ event = new VirtualControlLoopEvent();
+ event.setRequestId(REQ_ID);
+
+ config = new TreeMap<>();
+ context = new ControlLoopEventContext(event);
+ executor = new MyExec();
+
+ policy = new Policy();
+ policy.setActor(ACTOR);
+ policy.setRecipe(OPERATOR);
+ policy.setTimeout(TIMEOUT);
+
+ params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
+ .executor(executor).policy(policy).startCallback(this::starter).target(TARGET).build();
+
+ oper = new MyOper();
+ oper.configure(new TreeMap<>());
+ oper.start();
+
+ tstart = null;
+
+ opstart = null;
+ opend = null;
+ }
+
+ @Test
+ public void testOperatorPartial_testGetActorName_testGetName() {
+ assertEquals(ACTOR, oper.getActorName());
+ assertEquals(OPERATOR, oper.getName());
+ assertEquals(ACTOR + "." + OPERATOR, oper.getFullName());
+ }
+
+ @Test
+ public void testDoStart() {
+ oper = spy(new MyOper());
+
+ oper.configure(config);
+ oper.start();
+
+ verify(oper).doStart();
+
+ // others should not have been invoked
+ verify(oper, never()).doStop();
+ verify(oper, never()).doShutdown();
+ }
+
+ @Test
+ public void testDoStop() {
+ oper = spy(new MyOper());
+
+ oper.configure(config);
+ oper.start();
+ oper.stop();
+
+ verify(oper).doStop();
+
+ // should not have been re-invoked
+ verify(oper).doStart();
+
+ // others should not have been invoked
+ verify(oper, never()).doShutdown();
+ }
+
+ @Test
+ public void testDoShutdown() {
+ oper = spy(new MyOper());
+
+ oper.configure(config);
+ oper.start();
+ oper.shutdown();
+
+ verify(oper).doShutdown();
+
+ // should not have been re-invoked
+ verify(oper).doStart();
+
+ // others should not have been invoked
+ verify(oper, never()).doStop();
+ }
+
+ @Test
+ public void testStartOperation_testVerifyRunning() {
+ verifyRun("testStartOperation", 1, 1, PolicyResult.SUCCESS);
+ }
+
+ /**
+ * Tests startOperation() when the operator is not running.
+ */
+ @Test
+ public void testStartOperationNotRunning() {
+ // use a new operator, one that hasn't been started yet
+ oper = new MyOper();
+ oper.configure(new TreeMap<>());
+
+ assertThatIllegalStateException().isThrownBy(() -> oper.startOperation(params));
+ }
+
+ /**
+ * Tests startOperation() when the operation has a preprocessor.
+ */
+ @Test
+ public void testStartOperationWithPreprocessor_testStartPreprocessor() {
+ AtomicInteger count = new AtomicInteger();
+
+ // @formatter:off
+ Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preproc =
+ oper -> CompletableFuture.supplyAsync(() -> {
+ count.incrementAndGet();
+ oper.setOutcome(PolicyResult.SUCCESS.toString());
+ return oper;
+ }, executor);
+ // @formatter:on
+
+ oper.setPreProcessor(preproc);
+
+ verifyRun("testStartOperationWithPreprocessor_testStartPreprocessor", 1, 1, PolicyResult.SUCCESS);
+
+ assertEquals(1, count.get());
+ }
+
+ /**
+ * Tests startOperation() with multiple running requests.
+ */
+ @Test
+ public void testStartOperationMultiple() {
+ for (int count = 0; count < MAX_PARALLEL_REQUESTS; ++count) {
+ oper.startOperation(params);
+ }
+
+ assertTrue(executor.runAll());
+
+ assertNotNull(opstart);
+ assertNotNull(opend);
+ assertEquals(PolicyResult.SUCCESS.toString(), opend.getOutcome());
+
+ assertEquals(MAX_PARALLEL_REQUESTS, numStart);
+ assertEquals(MAX_PARALLEL_REQUESTS, oper.getCount());
+ assertEquals(MAX_PARALLEL_REQUESTS, numEnd);
+ }
+
+ /**
+ * Tests startPreprocessor() when the preprocessor returns a failure.
+ */
+ @Test
+ public void testStartPreprocessorFailure() {
+ // arrange for the preprocessor to return a failure
+ oper.setPreProcessor(oper -> {
+ oper.setOutcome(PolicyResult.FAILURE_GUARD.toString());
+ return CompletableFuture.completedFuture(oper);
+ });
+
+ verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
+ }
+
+ /**
+ * Tests startPreprocessor() when the preprocessor throws an exception.
+ */
+ @Test
+ public void testStartPreprocessorException() {
+ // arrange for the preprocessor to throw an exception
+ oper.setPreProcessor(oper -> {
+ throw new IllegalStateException(EXPECTED_EXCEPTION);
+ });
+
+ verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
+ }
+
+ /**
+ * Tests startPreprocessor() when the pipeline is not running.
+ */
+ @Test
+ public void testStartPreprocessorNotRunning() {
+ // arrange for the preprocessor to return success, which will be ignored
+ oper.setPreProcessor(oper -> {
+ oper.setOutcome(PolicyResult.SUCCESS.toString());
+ return CompletableFuture.completedFuture(oper);
+ });
+
+ oper.startOperation(params).cancel(false);
+ assertTrue(executor.runAll());
+
+ assertNull(opstart);
+ assertNull(opend);
+
+ assertEquals(0, numStart);
+ assertEquals(0, oper.getCount());
+ assertEquals(0, numEnd);
+ }
+
+ /**
+ * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
+ */
+ @Test
+ public void testStartPreprocessorBuilderException() {
+ oper = new MyOper() {
+ @Override
+ protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture(
+ ControlLoopOperationParams params) {
+ throw new IllegalStateException(EXPECTED_EXCEPTION);
+ }
+ };
+
+ oper.configure(new TreeMap<>());
+ oper.start();
+
+ assertThatIllegalStateException().isThrownBy(() -> oper.startOperation(params));
+
+ // should be nothing in the queue
+ assertEquals(0, executor.getQueueLength());
+ }
+
+ @Test
+ public void testDoPreprocessorAsFuture() {
+ assertNull(oper.doPreprocessorAsFuture(params));
+ }
+
+ @Test
+ public void testStartOperationOnly_testDoOperationAsFuture() {
+ oper.startOperation(params);
+ assertTrue(executor.runAll());
+
+ assertEquals(1, oper.getCount());
+ }
+
+ /**
+ * Tests startOperationOnce() when
+ * {@link OperatorPartial#doOperationAsFuture(ControlLoopOperationParams)} throws an
+ * exception.
+ */
+ @Test
+ public void testStartOperationOnceBuilderException() {
+ oper = new MyOper() {
+ @Override
+ protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture(
+ ControlLoopOperationParams params, int attempt) {
+ throw new IllegalStateException(EXPECTED_EXCEPTION);
+ }
+ };
+
+ oper.configure(new TreeMap<>());
+ oper.start();
+
+ assertThatIllegalStateException().isThrownBy(() -> oper.startOperation(params));
+
+ // should be nothing in the queue
+ assertEquals(0, executor.getQueueLength());
+ }
+
+ @Test
+ public void testIsSuccess() {
+ ControlLoopOperation outcome = new ControlLoopOperation();
+
+ outcome.setOutcome(PolicyResult.SUCCESS.toString());
+ assertTrue(oper.isSuccess(outcome));
+
+ for (String failure : FAILURE_STRINGS) {
+ outcome.setOutcome(failure);
+ assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
+ }
+ }
+
+ @Test
+ public void testIsActorFailed() {
+ assertFalse(oper.isActorFailed(null));
+
+ ControlLoopOperation outcome = params.makeOutcome();
+
+ // incorrect outcome
+ outcome.setOutcome(PolicyResult.SUCCESS.toString());
+ assertFalse(oper.isActorFailed(outcome));
+
+ outcome.setOutcome(PolicyResult.FAILURE_RETRIES.toString());
+ assertFalse(oper.isActorFailed(outcome));
+
+ // correct outcome
+ outcome.setOutcome(PolicyResult.FAILURE.toString());
+
+ // incorrect actor
+ outcome.setActor(TARGET);
+ assertFalse(oper.isActorFailed(outcome));
+ outcome.setActor(null);
+ assertFalse(oper.isActorFailed(outcome));
+ outcome.setActor(ACTOR);
+
+ // incorrect operation
+ outcome.setOperation(TARGET);
+ assertFalse(oper.isActorFailed(outcome));
+ outcome.setOperation(null);
+ assertFalse(oper.isActorFailed(outcome));
+ outcome.setOperation(OPERATOR);
+
+ // correct values
+ assertTrue(oper.isActorFailed(outcome));
+ }
+
+ @Test
+ public void testDoOperation() {
+ /*
+ * Use an operator that doesn't override doOperation().
+ */
+ OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATOR) {};
+
+ oper2.configure(new TreeMap<>());
+ oper2.start();
+
+ oper2.startOperation(params);
+ assertTrue(executor.runAll());
+
+ assertNotNull(opend);
+ assertEquals(PolicyResult.FAILURE_EXCEPTION.toString(), opend.getOutcome());
+ }
+
+ @Test
+ public void testTimeout() throws Exception {
+
+ // use a real executor
+ params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
+
+ // trigger timeout very quickly
+ oper = new MyOper() {
+ @Override
+ protected long getTimeOutMillis(Policy policy) {
+ return 1;
+ }
+
+ @Override
+ protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture(
+ ControlLoopOperationParams params, int attempt) {
+
+ return outcome -> {
+ ControlLoopOperation outcome2 = params.makeOutcome();
+ outcome2.setOutcome(PolicyResult.SUCCESS.toString());
+
+ /*
+ * Create an incomplete future that will timeout after the operation's
+ * timeout. If it fires before the other timer, then it will return a
+ * SUCCESS outcome.
+ */
+ CompletableFuture<ControlLoopOperation> future = new CompletableFuture<>();
+ future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
+ params.getExecutor());
+
+ return future;
+ };
+ }
+ };
+
+ oper.configure(new TreeMap<>());
+ oper.start();
+
+ assertEquals(PolicyResult.FAILURE_TIMEOUT.toString(), oper.startOperation(params).get().getOutcome());
+ }
+
+ /**
+ * Verifies that the timer doesn't encompass the preprocessor and doesn't stop the
+ * operation once the preprocessor completes.
+ */
+ @Test
+ public void testTimeoutInPreprocessor() throws Exception {
+
+ // use a real executor
+ params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
+
+ // trigger timeout very quickly
+ oper = new MyOper() {
+ @Override
+ protected long getTimeOutMillis(Policy policy) {
+ return 10;
+ }
+
+ @Override
+ protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture(
+ ControlLoopOperationParams params) {
+
+ return outcome -> {
+ outcome.setOutcome(PolicyResult.SUCCESS.toString());
+
+ /*
+ * Create an incomplete future that will timeout after the operation's
+ * timeout. If it fires before the other timer, then it will return a
+ * SUCCESS outcome.
+ */
+ CompletableFuture<ControlLoopOperation> future = new CompletableFuture<>();
+ future = future.orTimeout(200, TimeUnit.MILLISECONDS).handleAsync((unused1, unused2) -> outcome,
+ params.getExecutor());
+
+ return future;
+ };
+ }
+ };
+
+ oper.configure(new TreeMap<>());
+ oper.start();
+
+ ControlLoopOperation result = oper.startOperation(params).get();
+ assertEquals(PolicyResult.SUCCESS.toString(), result.getOutcome());
+
+ assertNotNull(opstart);
+ assertNotNull(opend);
+ assertEquals(PolicyResult.SUCCESS.toString(), opend.getOutcome());
+
+ assertEquals(1, numStart);
+ assertEquals(1, oper.getCount());
+ assertEquals(1, numEnd);
+ }
+
+ /**
+ * Tests retry functions, when the count is set to zero and retries are exhausted.
+ */
+ @Test
+ public void testSetRetryFlag_testRetryOnFailure_ZeroRetries() {
+ policy.setRetry(0);
+ oper.setMaxFailures(10);
+
+ verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
+ }
+
+ /**
+ * Tests retry functions, when the count is null and retries are exhausted.
+ */
+ @Test
+ public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
+ policy.setRetry(null);
+ oper.setMaxFailures(10);
+
+ verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
+ }
+
+ /**
+ * Tests retry functions, when retries are exhausted.
+ */
+ @Test
+ public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
+ final int maxRetries = 3;
+ policy.setRetry(maxRetries);
+ oper.setMaxFailures(10);
+
+ verifyRun("testVerifyRunningWhenNot", maxRetries + 1, maxRetries + 1, PolicyResult.FAILURE_RETRIES);
+ }
+
+ /**
+ * Tests retry functions, when a success follows some retries.
+ */
+ @Test
+ public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
+ policy.setRetry(10);
+
+ final int maxFailures = 3;
+ oper.setMaxFailures(maxFailures);
+
+ verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
+ PolicyResult.SUCCESS);
+ }
+
+ /**
+ * Tests retry functions, when the outcome is {@code null}.
+ */
+ @Test
+ public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
+
+ // arrange to return null from doOperation()
+ oper = new MyOper() {
+ @Override
+ protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt,
+ ControlLoopOperation operation) {
+
+ // update counters
+ super.doOperation(params, attempt, operation);
+ return null;
+ }
+ };
+
+ oper.configure(new TreeMap<>());
+ oper.start();
+
+ verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, null, noop());
+ }
+
+ @Test
+ public void testGetActorOutcome() {
+ assertNull(oper.getActorOutcome(null));
+
+ ControlLoopOperation outcome = params.makeOutcome();
+ outcome.setOutcome(TARGET);
+
+ // wrong actor - should be null
+ outcome.setActor(null);
+ assertNull(oper.getActorOutcome(outcome));
+ outcome.setActor(TARGET);
+ assertNull(oper.getActorOutcome(outcome));
+ outcome.setActor(ACTOR);
+
+ // wrong operation - should be null
+ outcome.setOperation(null);
+ assertNull(oper.getActorOutcome(outcome));
+ outcome.setOperation(TARGET);
+ assertNull(oper.getActorOutcome(outcome));
+ outcome.setOperation(OPERATOR);
+
+ assertEquals(TARGET, oper.getActorOutcome(outcome));
+ }
+
+ @Test
+ public void testOnSuccess() throws Exception {
+ AtomicInteger count = new AtomicInteger();
+
+ final Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> nextStep = oper -> {
+ count.incrementAndGet();
+ return CompletableFuture.completedFuture(oper);
+ };
+
+ // pass it a null outcome
+ ControlLoopOperation outcome = oper.onSuccess(params, nextStep).apply(null).get();
+ assertNotNull(outcome);
+ assertEquals(PolicyResult.FAILURE.toString(), outcome.getOutcome());
+ assertEquals(0, count.get());
+
+ // pass it an unpopulated (i.e., failed) outcome
+ outcome = new ControlLoopOperation();
+ assertSame(outcome, oper.onSuccess(params, nextStep).apply(outcome).get());
+ assertEquals(0, count.get());
+
+ // pass it a successful outcome
+ outcome = params.makeOutcome();
+ outcome.setOutcome(PolicyResult.SUCCESS.toString());
+ assertSame(outcome, oper.onSuccess(params, nextStep).apply(outcome).get());
+ assertEquals(PolicyResult.SUCCESS.toString(), outcome.getOutcome());
+ assertEquals(1, count.get());
+ }
+
+ /**
+ * Tests onSuccess() and handleFailure() when the outcome is a success.
+ */
+ @Test
+ public void testOnSuccessTrue_testHandleFailureTrue() {
+ // arrange to return a success from the preprocessor
+ oper.setPreProcessor(oper -> {
+ oper.setOutcome(PolicyResult.SUCCESS.toString());
+ return CompletableFuture.completedFuture(oper);
+ });
+
+ verifyRun("testOnSuccessTrue_testHandleFailureTrue", 1, 1, PolicyResult.SUCCESS);
+ }
+
+ /**
+ * Tests onSuccess() and handleFailure() when the outcome is <i>not</i> a success.
+ */
+ @Test
+ public void testOnSuccessFalse_testHandleFailureFalse() throws Exception {
+ // arrange to return a failure from the preprocessor
+ oper.setPreProcessor(oper -> {
+ oper.setOutcome(PolicyResult.FAILURE.toString());
+ return CompletableFuture.completedFuture(oper);
+ });
+
+ verifyRun("testOnSuccessFalse_testHandleFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
+ }
+
+ /**
+ * Tests onSuccess() and handleFailure() when the outcome is {@code null}.
+ */
+ @Test
+ public void testOnSuccessFalse_testHandleFailureNull() throws Exception {
+ // arrange to return null from the preprocessor
+ oper.setPreProcessor(oper -> {
+ return CompletableFuture.completedFuture(null);
+ });
+
+ verifyRun("testOnSuccessFalse_testHandleFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
+ }
+
+ @Test
+ public void testFromException() {
+ // arrange to generate an exception when operation runs
+ oper.setGenException(true);
+
+ verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
+ }
+
+ /**
+ * Tests fromException() when there is no exception.
+ */
+ @Test
+ public void testFromExceptionNoExcept() {
+ verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
+ }
+
+ /**
+ * Tests verifyRunning() when the pipeline is not running.
+ */
+ @Test
+ public void testVerifyRunningWhenNot() {
+ verifyRun("testVerifyRunningWhenNot", 0, 0, PolicyResult.SUCCESS, future -> future.cancel(false));
+ }
+
+ /**
+ * Tests callbackStarted() when the pipeline has already been stopped.
+ */
+ @Test
+ public void testCallbackStartedNotRunning() {
+ AtomicReference<Future<ControlLoopOperation>> future = new AtomicReference<>();
+
+ /*
+ * arrange to stop the controller when the start-callback is invoked, but capture
+ * the outcome
+ */
+ params = params.toBuilder().startCallback(oper -> {
+ starter(oper);
+ future.get().cancel(false);
+ }).build();
+
+ future.set(oper.startOperation(params));
+ assertTrue(executor.runAll());
+
+ // should have only run once
+ assertEquals(1, numStart);
+ }
+
+ /**
+ * Tests callbackCompleted() when the pipeline has already been stopped.
+ */
+ @Test
+ public void testCallbackCompletedNotRunning() {
+ AtomicReference<Future<ControlLoopOperation>> future = new AtomicReference<>();
+
+ // arrange to stop the controller when the start-callback is invoked
+ params = params.toBuilder().startCallback(oper -> {
+ future.get().cancel(false);
+ }).build();
+
+ future.set(oper.startOperation(params));
+ assertTrue(executor.runAll());
+
+ // should not have been set
+ assertNull(opend);
+ assertEquals(0, numEnd);
+ }
+
+ @Test
+ public void testSetOutcomeControlLoopOperationThrowable() {
+ final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
+
+ ControlLoopOperation outcome;
+
+ outcome = new ControlLoopOperation();
+ oper.setOutcome(params, outcome, timex);
+ assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
+ assertEquals(PolicyResult.FAILURE_TIMEOUT.toString(), outcome.getOutcome());
+
+ outcome = new ControlLoopOperation();
+ oper.setOutcome(params, outcome, new IllegalStateException());
+ assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
+ assertEquals(PolicyResult.FAILURE_EXCEPTION.toString(), outcome.getOutcome());
+ }
+
+ @Test
+ public void testSetOutcomeControlLoopOperationPolicyResult() {
+ ControlLoopOperation outcome;
+
+ outcome = new ControlLoopOperation();
+ oper.setOutcome(params, outcome, PolicyResult.SUCCESS);
+ assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
+ assertEquals(PolicyResult.SUCCESS.toString(), outcome.getOutcome());
+
+ for (PolicyResult result : FAILURE_RESULTS) {
+ outcome = new ControlLoopOperation();
+ oper.setOutcome(params, outcome, result);
+ assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
+ assertEquals(result.toString(), result.toString(), outcome.getOutcome());
+ }
+ }
+
+ @Test
+ public void testIsTimeout() {
+ final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
+
+ assertFalse(oper.isTimeout(new IllegalStateException()));
+ assertFalse(oper.isTimeout(new IllegalStateException(timex)));
+ assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
+ assertFalse(oper.isTimeout(new CompletionException(null)));
+ assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
+
+ assertTrue(oper.isTimeout(timex));
+ assertTrue(oper.isTimeout(new CompletionException(timex)));
+ }
+
+ @Test
+ public void testGetTimeOutMillis() {
+ assertEquals(TIMEOUT * 1000, oper.getTimeOutMillis(policy));
+
+ policy.setTimeout(null);
+ assertEquals(0, oper.getTimeOutMillis(policy));
+ }
+
+ private void starter(ControlLoopOperation oper) {
+ ++numStart;
+ tstart = oper.getStart();
+ opstart = oper;
+ }
+
+ private void completer(ControlLoopOperation oper) {
+ ++numEnd;
+ opend = oper;
+ }
+
+ /**
+ * Gets a function that does nothing.
+ *
+ * @param <T> type of input parameter expected by the function
+ * @return a function that does nothing
+ */
+ private <T> Consumer<T> noop() {
+ return unused -> {
+ };
+ }
+
+ /**
+ * Verifies a run.
+ *
+ * @param testName test name
+ * @param expectedCallbacks number of callbacks expected
+ * @param expectedOperations number of operation invocations expected
+ * @param expectedResult expected outcome
+ */
+ private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
+ PolicyResult expectedResult) {
+
+ String expectedSubRequestId =
+ (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
+
+ verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
+ }
+
+ /**
+ * Verifies a run.
+ *
+ * @param testName test name
+ * @param expectedCallbacks number of callbacks expected
+ * @param expectedOperations number of operation invocations expected
+ * @param expectedResult expected outcome
+ * @param manipulator function to modify the future returned by
+ * {@link OperatorPartial#startOperation(ControlLoopOperationParams)} before
+ * the tasks in the executor are run
+ */
+ private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
+ Consumer<CompletableFuture<ControlLoopOperation>> manipulator) {
+
+ String expectedSubRequestId =
+ (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
+
+ verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, manipulator);
+ }
+
+ /**
+ * Verifies a run.
+ *
+ * @param testName test name
+ * @param expectedCallbacks number of callbacks expected
+ * @param expectedOperations number of operation invocations expected
+ * @param expectedResult expected outcome
+ * @param expectedSubRequestId expected sub request ID
+ * @param manipulator function to modify the future returned by
+ * {@link OperatorPartial#startOperation(ControlLoopOperationParams)} before
+ * the tasks in the executor are run
+ */
+ private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
+ String expectedSubRequestId, Consumer<CompletableFuture<ControlLoopOperation>> manipulator) {
+
+ CompletableFuture<ControlLoopOperation> future = oper.startOperation(params);
+
+ manipulator.accept(future);
+
+ assertTrue(testName, executor.runAll());
+
+ assertEquals(testName, expectedCallbacks, numStart);
+ assertEquals(testName, expectedCallbacks, numEnd);
+
+ if (expectedCallbacks > 0) {
+ assertNotNull(testName, opstart);
+ assertNotNull(testName, opend);
+ assertEquals(testName, expectedResult.toString(), opend.getOutcome());
+
+ assertSame(testName, tstart, opstart.getStart());
+ assertSame(testName, tstart, opend.getStart());
+
+ try {
+ assertTrue(future.isDone());
+ assertSame(testName, opend, future.get());
+
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IllegalStateException(e);
+ }
+
+ if (expectedOperations > 0) {
+ assertEquals(testName, expectedSubRequestId, opend.getSubRequestId());
+ }
+ }
+
+ assertEquals(testName, expectedOperations, oper.getCount());
+ }
+
+ private static class MyOper extends OperatorPartial {
+ @Getter
+ private int count = 0;
+
+ @Setter
+ private boolean genException;
+
+ @Setter
+ private int maxFailures = 0;
+
+ @Setter
+ private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preProcessor;
+
+ public MyOper() {
+ super(ACTOR, OPERATOR);
+ }
+
+ @Override
+ protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt,
+ ControlLoopOperation operation) {
+ ++count;
+ if (genException) {
+ throw new IllegalStateException(EXPECTED_EXCEPTION);
+ }
+
+ operation.setSubRequestId(String.valueOf(attempt));
+
+ if (count > maxFailures) {
+ operation.setOutcome(PolicyResult.SUCCESS.toString());
+ } else {
+ operation.setOutcome(PolicyResult.FAILURE.toString());
+ }
+
+ return operation;
+ }
+
+ @Override
+ protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture(
+ ControlLoopOperationParams params) {
+
+ return (preProcessor != null ? preProcessor : super.doPreprocessorAsFuture(params));
+ }
+ }
+
+ /**
+ * Executor that will run tasks until the queue is empty or a maximum number of tasks
+ * have been executed.
+ */
+ private static class MyExec implements Executor {
+ private static final int MAX_TASKS = MAX_PARALLEL_REQUESTS * 100;
+
+ private Queue<Runnable> commands = new LinkedList<>();
+
+ public MyExec() {
+ // do nothing
+ }
+
+ public int getQueueLength() {
+ return commands.size();
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ commands.add(command);
+ }
+
+ public boolean runAll() {
+ for (int count = 0; count < MAX_TASKS && !commands.isEmpty(); ++count) {
+ commands.remove().run();
+ }
+
+ return commands.isEmpty();
+ }
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/StartConfigPartialTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/StartConfigPartialTest.java
new file mode 100644
index 000000000..7a822c1d9
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/StartConfigPartialTest.java
@@ -0,0 +1,212 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class StartConfigPartialTest {
+ private static final IllegalArgumentException EXPECTED_EXCEPTION =
+ new IllegalArgumentException("expected exception");
+ private static final String MY_NAME = "my-name";
+ private static final String PARAMS = "config data";
+ private static final String PARAMS2 = "config data #2";
+ private static final String PARAMSX = "config data exception";
+
+ private StartConfigPartial<String> config;
+
+ /**
+ * Creates a config whose doXxx() methods do nothing.
+ */
+ @Before
+ public void setUp() {
+ config = new StartConfigPartial<>(MY_NAME) {
+ @Override
+ protected void doConfigure(String parameters) {
+ // do nothing
+ }
+
+ @Override
+ protected void doStart() {
+ // do nothing
+ }
+
+ @Override
+ protected void doStop() {
+ // do nothing
+ }
+
+ @Override
+ protected void doShutdown() {
+ // do nothing
+ }
+ };
+
+ config = spy(config);
+ }
+
+ @Test
+ public void testConfigImpl_testGetFullName() {
+ assertEquals(MY_NAME, config.getFullName());
+ }
+
+ @Test
+ public void testIsAlive() {
+ assertFalse(config.isAlive());
+ }
+
+ @Test
+ public void testIsConfigured_testConfigure() {
+ // throw an exception during doConfigure(), but should remain unconfigured
+ assertFalse(config.isConfigured());
+ doThrow(EXPECTED_EXCEPTION).when(config).doConfigure(PARAMSX);
+ assertThatIllegalArgumentException().isThrownBy(() -> config.configure(PARAMSX)).isEqualTo(EXPECTED_EXCEPTION);
+ assertFalse(config.isConfigured());
+
+ assertFalse(config.isConfigured());
+ config.configure(PARAMS);
+ verify(config).doConfigure(PARAMS);
+ assertTrue(config.isConfigured());
+
+ // should not be able to re-configure while running
+ config.start();
+ assertThatIllegalStateException().isThrownBy(() -> config.configure(PARAMS2)).withMessageContaining(MY_NAME);
+ verify(config, never()).doConfigure(PARAMS2);
+
+ // should be able to re-configure after stopping
+ config.stop();
+ config.configure(PARAMS2);
+ verify(config).doConfigure(PARAMS2);
+ assertTrue(config.isConfigured());
+
+ // should remain configured after exception
+ doThrow(EXPECTED_EXCEPTION).when(config).doConfigure(PARAMSX);
+ assertThatIllegalArgumentException().isThrownBy(() -> config.configure(PARAMSX)).isEqualTo(EXPECTED_EXCEPTION);
+ assertTrue(config.isConfigured());
+ }
+
+ @Test
+ public void testStart() {
+ assertFalse(config.isAlive());
+
+ // can't start if not configured yet
+ assertThatIllegalStateException().isThrownBy(() -> config.start()).withMessageContaining(MY_NAME);
+ assertFalse(config.isAlive());
+
+ config.configure(PARAMS);
+
+ config.start();
+ verify(config).doStart();
+ assertTrue(config.isAlive());
+ assertTrue(config.isConfigured());
+
+ // ok to restart when running, but shouldn't invoke doStart() again
+ config.start();
+ verify(config).doStart();
+ assertTrue(config.isAlive());
+ assertTrue(config.isConfigured());
+
+ // should never have invoked these
+ verify(config, never()).doStop();
+ verify(config, never()).doShutdown();
+
+ // throw exception when started again, but should remain stopped
+ config.stop();
+ doThrow(EXPECTED_EXCEPTION).when(config).doStart();
+ assertThatIllegalArgumentException().isThrownBy(() -> config.start()).isEqualTo(EXPECTED_EXCEPTION);
+ assertFalse(config.isAlive());
+ assertTrue(config.isConfigured());
+ }
+
+ @Test
+ public void testStop() {
+ config.configure(PARAMS);
+
+ // ok to stop if not running, but shouldn't invoke doStop()
+ config.stop();
+ verify(config, never()).doStop();
+ assertFalse(config.isAlive());
+ assertTrue(config.isConfigured());
+
+ config.start();
+
+ // now stop should have an effect
+ config.stop();
+ verify(config).doStop();
+ assertFalse(config.isAlive());
+ assertTrue(config.isConfigured());
+
+ // should have only invoked this once
+ verify(config).doStart();
+
+ // should never have invoked these
+ verify(config, never()).doShutdown();
+
+ // throw exception when stopped again, but should go ahead and stop
+ config.start();
+ doThrow(EXPECTED_EXCEPTION).when(config).doStop();
+ assertThatIllegalArgumentException().isThrownBy(() -> config.stop()).isEqualTo(EXPECTED_EXCEPTION);
+ assertFalse(config.isAlive());
+ assertTrue(config.isConfigured());
+ }
+
+ @Test
+ public void testShutdown() {
+ config.configure(PARAMS);
+
+ // ok to shutdown if not running, but shouldn't invoke doShutdown()
+ config.shutdown();
+ verify(config, never()).doShutdown();
+ assertFalse(config.isAlive());
+ assertTrue(config.isConfigured());
+
+ config.start();
+
+ // now stop should have an effect
+ config.shutdown();
+ verify(config).doShutdown();
+ assertFalse(config.isAlive());
+ assertTrue(config.isConfigured());
+
+ // should have only invoked this once
+ verify(config).doStart();
+
+ // should never have invoked these
+ verify(config, never()).doStop();
+
+ // throw exception when shut down again, but should go ahead and shut down
+ config.start();
+ doThrow(EXPECTED_EXCEPTION).when(config).doShutdown();
+ assertThatIllegalArgumentException().isThrownBy(() -> config.shutdown()).isEqualTo(EXPECTED_EXCEPTION);
+ assertFalse(config.isAlive());
+ assertTrue(config.isConfigured());
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParamsTest.java
new file mode 100644
index 000000000..0c8e77d38
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParamsTest.java
@@ -0,0 +1,314 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.parameters.BeanValidationResult;
+import org.onap.policy.controlloop.ControlLoopOperation;
+import org.onap.policy.controlloop.VirtualControlLoopEvent;
+import org.onap.policy.controlloop.actorserviceprovider.ActorService;
+import org.onap.policy.controlloop.actorserviceprovider.Operator;
+import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams.ControlLoopOperationParamsBuilder;
+import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
+import org.onap.policy.controlloop.policy.Policy;
+
+public class ControlLoopOperationParamsTest {
+ private static final String EXPECTED_EXCEPTION = "expected exception";
+ private static final String ACTOR = "my-actor";
+ private static final String OPERATION = "my-operation";
+ private static final String TARGET = "my-target";
+ private static final UUID REQ_ID = UUID.randomUUID();
+
+ @Mock
+ private Actor actor;
+
+ @Mock
+ private ActorService actorService;
+
+ @Mock
+ private Consumer<ControlLoopOperation> completer;
+
+ @Mock
+ private ControlLoopEventContext context;
+
+ @Mock
+ private VirtualControlLoopEvent event;
+
+ @Mock
+ private Executor executor;
+
+ @Mock
+ private CompletableFuture<ControlLoopOperation> operation;
+
+ @Mock
+ private Operator operator;
+
+ @Mock
+ private Policy policy;
+
+ @Mock
+ private Consumer<ControlLoopOperation> starter;
+
+ private ControlLoopOperationParams params;
+ private ControlLoopOperation outcome;
+
+
+ /**
+ * Initializes mocks and sets {@link #params} to a fully-loaded set of parameters.
+ */
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ when(actorService.getActor(ACTOR)).thenReturn(actor);
+ when(actor.getOperator(OPERATION)).thenReturn(operator);
+ when(operator.startOperation(any())).thenReturn(operation);
+
+ when(event.getRequestId()).thenReturn(REQ_ID);
+
+ when(context.getEvent()).thenReturn(event);
+
+ when(policy.getActor()).thenReturn(ACTOR);
+ when(policy.getRecipe()).thenReturn(OPERATION);
+
+ params = ControlLoopOperationParams.builder().actorService(actorService).completeCallback(completer)
+ .context(context).executor(executor).policy(policy).startCallback(starter).target(TARGET)
+ .build();
+
+ outcome = params.makeOutcome();
+ }
+
+ @Test
+ public void testStart() {
+ assertSame(operation, params.start());
+
+ assertThatIllegalArgumentException().isThrownBy(() -> params.toBuilder().context(null).build().start());
+ }
+
+ @Test
+ public void testGetActor() {
+ assertEquals(ACTOR, params.getActor());
+
+ // try with null policy
+ assertEquals(ControlLoopOperationParams.UNKNOWN, params.toBuilder().policy(null).build().getActor());
+
+ // try with null name in the policy
+ when(policy.getActor()).thenReturn(null);
+ assertEquals(ControlLoopOperationParams.UNKNOWN, params.getActor());
+ }
+
+ @Test
+ public void testGetOperation() {
+ assertEquals(OPERATION, params.getOperation());
+
+ // try with null policy
+ assertEquals(ControlLoopOperationParams.UNKNOWN, params.toBuilder().policy(null).build().getOperation());
+
+ // try with null name in the policy
+ when(policy.getRecipe()).thenReturn(null);
+ assertEquals(ControlLoopOperationParams.UNKNOWN, params.getOperation());
+ }
+
+ @Test
+ public void testGetRequestId() {
+ assertSame(REQ_ID, params.getRequestId());
+
+ // try with null context
+ assertNull(params.toBuilder().context(null).build().getRequestId());
+
+ // try with null event
+ when(context.getEvent()).thenReturn(null);
+ assertNull(params.getRequestId());
+ }
+
+ @Test
+ public void testMakeOutcome() {
+ assertEquals(ACTOR, outcome.getActor());
+ assertEquals(OPERATION, outcome.getOperation());
+ checkRemainingFields("with actor");
+
+ // try again with a null policy
+ outcome = params.toBuilder().policy(null).build().makeOutcome();
+ assertEquals(ControlLoopOperationParams.UNKNOWN, outcome.getActor());
+ assertEquals(ControlLoopOperationParams.UNKNOWN, outcome.getOperation());
+ checkRemainingFields("unknown actor");
+ }
+
+ protected void checkRemainingFields(String testName) {
+ assertEquals(testName, TARGET, outcome.getTarget());
+ assertNotNull(testName, outcome.getStart());
+ assertNull(testName, outcome.getEnd());
+ assertNull(testName, outcome.getSubRequestId());
+ assertNull(testName, outcome.getOutcome());
+ assertNull(testName, outcome.getMessage());
+ }
+
+ @Test
+ public void testCallbackStarted() {
+ params.callbackStarted(outcome);
+ verify(starter).accept(outcome);
+
+ // modify starter to throw an exception
+ AtomicInteger count = new AtomicInteger();
+ doAnswer(args -> {
+ count.incrementAndGet();
+ throw new IllegalStateException(EXPECTED_EXCEPTION);
+ }).when(starter).accept(outcome);
+
+ params.callbackStarted(outcome);
+ verify(starter, times(2)).accept(outcome);
+ assertEquals(1, count.get());
+
+ // repeat with no start-callback - no additional calls expected
+ params.toBuilder().startCallback(null).build().callbackStarted(outcome);
+ verify(starter, times(2)).accept(outcome);
+ assertEquals(1, count.get());
+
+ // should not call complete-callback
+ verify(completer, never()).accept(any());
+ }
+
+ @Test
+ public void testCallbackCompleted() {
+ params.callbackCompleted(outcome);
+ verify(completer).accept(outcome);
+
+ // modify completer to throw an exception
+ AtomicInteger count = new AtomicInteger();
+ doAnswer(args -> {
+ count.incrementAndGet();
+ throw new IllegalStateException(EXPECTED_EXCEPTION);
+ }).when(completer).accept(outcome);
+
+ params.callbackCompleted(outcome);
+ verify(completer, times(2)).accept(outcome);
+ assertEquals(1, count.get());
+
+ // repeat with no complete-callback - no additional calls expected
+ params.toBuilder().completeCallback(null).build().callbackCompleted(outcome);
+ verify(completer, times(2)).accept(outcome);
+ assertEquals(1, count.get());
+
+ // should not call start-callback
+ verify(starter, never()).accept(any());
+ }
+
+ @Test
+ public void testValidateFields() {
+ testValidate("actorService", "null", bldr -> bldr.actorService(null));
+ testValidate("context", "null", bldr -> bldr.context(null));
+ testValidate("executor", "null", bldr -> bldr.executor(null));
+ testValidate("policy", "null", bldr -> bldr.policy(null));
+ testValidate("target", "null", bldr -> bldr.target(null));
+
+ // check edge cases
+ assertTrue(params.toBuilder().build().validate().isValid());
+
+ // these can be null
+ assertTrue(params.toBuilder().startCallback(null).completeCallback(null).build().validate().isValid());
+
+ // test with minimal fields
+ assertTrue(ControlLoopOperationParams.builder().actorService(actorService).context(context).policy(policy)
+ .target(TARGET).build().validate().isValid());
+ }
+
+ private void testValidate(String fieldName, String expected,
+ Function<ControlLoopOperationParamsBuilder, ControlLoopOperationParamsBuilder> makeInvalid) {
+
+ // original params should be valid
+ BeanValidationResult result = params.validate();
+ assertTrue(fieldName, result.isValid());
+
+ // make invalid params
+ result = makeInvalid.apply(params.toBuilder()).build().validate();
+ assertFalse(fieldName, result.isValid());
+ assertThat(result.getResult()).contains(fieldName).contains(expected);
+ }
+
+ @Test
+ public void testBuilder_testToBuilder() {
+ assertEquals(params, params.toBuilder().build());
+ }
+
+ @Test
+ public void testActorService() {
+ assertSame(actorService, params.getActorService());
+ }
+
+ @Test
+ public void testGetContext() {
+ assertSame(context, params.getContext());
+ }
+
+ @Test
+ public void testGetExecutor() {
+ assertSame(executor, params.getExecutor());
+
+ // should use default when unspecified
+ assertSame(ForkJoinPool.commonPool(), ControlLoopOperationParams.builder().build().getExecutor());
+ }
+
+ @Test
+ public void testGetPolicy() {
+ assertSame(policy, params.getPolicy());
+ }
+
+ @Test
+ public void testGetStartCallback() {
+ assertSame(starter, params.getStartCallback());
+ }
+
+ @Test
+ public void testGetCompleteCallback() {
+ assertSame(completer, params.getCompleteCallback());
+ }
+
+ @Test
+ public void testGetTarget() {
+ assertEquals(TARGET, params.getTarget());
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParamsTest.java
new file mode 100644
index 000000000..1763388f2
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParamsTest.java
@@ -0,0 +1,130 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.parameters.ValidationResult;
+
+public class HttpActorParamsTest {
+
+ private static final String CONTAINER = "my-container";
+ private static final String CLIENT = "my-client";
+ private static final long TIMEOUT = 10;
+
+ private static final String PATH1 = "path #1";
+ private static final String PATH2 = "path #2";
+ private static final String URI1 = "uri #1";
+ private static final String URI2 = "uri #2";
+
+ private Map<String, String> paths;
+ private HttpActorParams params;
+
+ /**
+ * Initializes {@link #paths} with two items and {@link params} with a fully populated
+ * object.
+ */
+ @Before
+ public void setUp() {
+ paths = new TreeMap<>();
+ paths.put(PATH1, URI1);
+ paths.put(PATH2, URI2);
+
+ params = makeHttpActorParams();
+ }
+
+ @Test
+ public void testMakeOperationParameters() {
+ Function<String, Map<String, Object>> maker = params.makeOperationParameters(CONTAINER);
+ assertNull(maker.apply("unknown-operation"));
+
+ Map<String, Object> subparam = maker.apply(PATH1);
+ assertNotNull(subparam);
+ assertEquals("{clientName=my-client, path=uri #1, timeoutSec=10}", new TreeMap<>(subparam).toString());
+
+ subparam = maker.apply(PATH2);
+ assertNotNull(subparam);
+ assertEquals("{clientName=my-client, path=uri #2, timeoutSec=10}", new TreeMap<>(subparam).toString());
+ }
+
+ @Test
+ public void testDoValidation() {
+ assertThatCode(() -> params.doValidation(CONTAINER)).doesNotThrowAnyException();
+
+ // invalid param
+ params.setClientName(null);
+ assertThatThrownBy(() -> params.doValidation(CONTAINER))
+ .isInstanceOf(ParameterValidationRuntimeException.class);
+ }
+
+ @Test
+ public void testValidate() {
+ testValidateField("clientName", "null", params2 -> params2.setClientName(null));
+ testValidateField("path", "null", params2 -> params2.setPath(null));
+ testValidateField("timeoutSec", "minimum", params2 -> params2.setTimeoutSec(-1));
+
+ // check edge cases
+ params.setTimeoutSec(0);
+ assertTrue(params.validate(CONTAINER).isValid());
+
+ params.setTimeoutSec(1);
+ assertTrue(params.validate(CONTAINER).isValid());
+
+ // one path value is null
+ testValidateField(PATH2, "null", params2 -> paths.put(PATH2, null));
+ }
+
+ private void testValidateField(String fieldName, String expected, Consumer<HttpActorParams> makeInvalid) {
+
+ // original params should be valid
+ ValidationResult result = params.validate(CONTAINER);
+ assertTrue(fieldName, result.isValid());
+
+ // make invalid params
+ HttpActorParams params2 = makeHttpActorParams();
+ makeInvalid.accept(params2);
+ result = params2.validate(CONTAINER);
+ assertFalse(fieldName, result.isValid());
+ assertThat(result.getResult()).contains(CONTAINER).contains(fieldName).contains(expected);
+ }
+
+ private HttpActorParams makeHttpActorParams() {
+ HttpActorParams params2 = new HttpActorParams();
+ params2.setClientName(CLIENT);
+ params2.setTimeoutSec(TIMEOUT);
+ params2.setPath(paths);
+
+ return params2;
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParamsTest.java
new file mode 100644
index 000000000..829c480d1
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParamsTest.java
@@ -0,0 +1,80 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.function.Function;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams.HttpParamsBuilder;
+
+public class HttpParamsTest {
+
+ private static final String CONTAINER = "my-container";
+ private static final String CLIENT = "my-client";
+ private static final String PATH = "my-path";
+ private static final long TIMEOUT = 10;
+
+ private HttpParams params;
+
+ @Before
+ public void setUp() {
+ params = HttpParams.builder().clientName(CLIENT).path(PATH).timeoutSec(TIMEOUT).build();
+ }
+
+ @Test
+ public void testValidate() {
+ testValidateField("clientName", "null", bldr -> bldr.clientName(null));
+ testValidateField("path", "null", bldr -> bldr.path(null));
+ testValidateField("timeoutSec", "minimum", bldr -> bldr.timeoutSec(-1));
+
+ // check edge cases
+ assertTrue(params.toBuilder().timeoutSec(0).build().validate(CONTAINER).isValid());
+ assertTrue(params.toBuilder().timeoutSec(1).build().validate(CONTAINER).isValid());
+ }
+
+ @Test
+ public void testBuilder_testToBuilder() {
+ assertEquals(CLIENT, params.getClientName());
+ assertEquals(PATH, params.getPath());
+ assertEquals(TIMEOUT, params.getTimeoutSec());
+
+ assertEquals(params, params.toBuilder().build());
+ }
+
+ private void testValidateField(String fieldName, String expected,
+ Function<HttpParamsBuilder, HttpParamsBuilder> makeInvalid) {
+
+ // original params should be valid
+ ValidationResult result = params.validate(CONTAINER);
+ assertTrue(fieldName, result.isValid());
+
+ // make invalid params
+ result = makeInvalid.apply(params.toBuilder()).build().validate(CONTAINER);
+ assertFalse(fieldName, result.isValid());
+ assertThat(result.getResult()).contains(fieldName).contains(expected);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ParameterValidationRuntimeExceptionTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ParameterValidationRuntimeExceptionTest.java
new file mode 100644
index 000000000..9879f604f
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ParameterValidationRuntimeExceptionTest.java
@@ -0,0 +1,82 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.parameters.ObjectValidationResult;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.common.parameters.ValidationStatus;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
+
+public class ParameterValidationRuntimeExceptionTest {
+
+ private static final String THE_MESSAGE = "the message";
+ private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
+
+ private ValidationResult result;
+
+ @Before
+ public void setUp() {
+ result = new ObjectValidationResult("param", null, ValidationStatus.INVALID, "null");
+ }
+
+ @Test
+ public void testParameterValidationExceptionValidationResult() {
+ ParameterValidationRuntimeException ex = new ParameterValidationRuntimeException(result);
+ assertSame(result, ex.getResult());
+ assertNull(ex.getMessage());
+ }
+
+ @Test
+ public void testParameterValidationExceptionValidationResultString() {
+ ParameterValidationRuntimeException ex = new ParameterValidationRuntimeException(THE_MESSAGE, result);
+ assertSame(result, ex.getResult());
+ assertEquals(THE_MESSAGE, ex.getMessage());
+ }
+
+ @Test
+ public void testParameterValidationExceptionValidationResultThrowable() {
+ ParameterValidationRuntimeException ex = new ParameterValidationRuntimeException(EXPECTED_EXCEPTION, result);
+ assertSame(result, ex.getResult());
+ assertEquals(EXPECTED_EXCEPTION.toString(), ex.getMessage());
+ assertEquals(EXPECTED_EXCEPTION, ex.getCause());
+ }
+
+ @Test
+ public void testParameterValidationExceptionValidationResultStringThrowable() {
+ ParameterValidationRuntimeException ex =
+ new ParameterValidationRuntimeException(THE_MESSAGE, EXPECTED_EXCEPTION, result);
+ assertSame(result, ex.getResult());
+ assertEquals(THE_MESSAGE, ex.getMessage());
+ assertEquals(EXPECTED_EXCEPTION, ex.getCause());
+ }
+
+ @Test
+ public void testGetResult() {
+ ParameterValidationRuntimeException ex = new ParameterValidationRuntimeException(result);
+ assertSame(result, ex.getResult());
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParamsTest.java
new file mode 100644
index 000000000..4834c98d2
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParamsTest.java
@@ -0,0 +1,80 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.function.Function;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicParams.TopicParamsBuilder;
+
+public class TopicParamsTest {
+
+ private static final String CONTAINER = "my-container";
+ private static final String TARGET = "my-target";
+ private static final String SOURCE = "my-source";
+ private static final long TIMEOUT = 10;
+
+ private TopicParams params;
+
+ @Before
+ public void setUp() {
+ params = TopicParams.builder().target(TARGET).source(SOURCE).timeoutSec(TIMEOUT).build();
+ }
+
+ @Test
+ public void testValidate() {
+ testValidateField("target", "null", bldr -> bldr.target(null));
+ testValidateField("source", "null", bldr -> bldr.source(null));
+ testValidateField("timeoutSec", "minimum", bldr -> bldr.timeoutSec(-1));
+
+ // check edge cases
+ assertTrue(params.toBuilder().timeoutSec(0).build().validate(CONTAINER).isValid());
+ assertTrue(params.toBuilder().timeoutSec(1).build().validate(CONTAINER).isValid());
+ }
+
+ @Test
+ public void testBuilder_testToBuilder() {
+ assertEquals(TARGET, params.getTarget());
+ assertEquals(SOURCE, params.getSource());
+ assertEquals(TIMEOUT, params.getTimeoutSec());
+
+ assertEquals(params, params.toBuilder().build());
+ }
+
+ private void testValidateField(String fieldName, String expected,
+ Function<TopicParamsBuilder, TopicParamsBuilder> makeInvalid) {
+
+ // original params should be valid
+ ValidationResult result = params.validate(CONTAINER);
+ assertTrue(fieldName, result.isValid());
+
+ // make invalid params
+ result = makeInvalid.apply(params.toBuilder()).build().validate(CONTAINER);
+ assertFalse(fieldName, result.isValid());
+ assertThat(result.getResult()).contains(fieldName).contains(expected);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/FutureManagerTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/FutureManagerTest.java
new file mode 100644
index 000000000..de1cf0f8d
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/FutureManagerTest.java
@@ -0,0 +1,142 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.pipeline;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.Future;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+public class FutureManagerTest {
+
+ private static final String EXPECTED_EXCEPTION = "expected exception";
+
+ @Mock
+ private Future<String> future1;
+
+ @Mock
+ private Future<String> future2;
+
+ @Mock
+ private Future<String> future3;
+
+ private FutureManager mgr;
+
+ /**
+ * Initializes fields, including {@link #mgr}.
+ */
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ mgr = new FutureManager();
+ }
+
+ @Test
+ public void testStop() {
+ mgr.add(future1);
+ mgr.add(future2);
+ mgr.add(future3);
+
+ // arrange for one to throw an exception
+ when(future2.cancel(anyBoolean())).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
+
+ // nothing should have been canceled yet
+ verify(future1, never()).cancel(anyBoolean());
+ verify(future2, never()).cancel(anyBoolean());
+ verify(future3, never()).cancel(anyBoolean());
+
+ assertTrue(mgr.isRunning());
+
+ // stop the controller
+
+ // stop the controller
+ mgr.stop();
+
+ // all controllers should now be stopped
+ assertFalse(mgr.isRunning());
+
+ // everything should have been invoked
+ verify(future1).cancel(anyBoolean());
+ verify(future2).cancel(anyBoolean());
+ verify(future3).cancel(anyBoolean());
+
+ // re-invoking stop should have no effect on the listeners
+ mgr.stop();
+
+ verify(future1).cancel(anyBoolean());
+ verify(future2).cancel(anyBoolean());
+ verify(future3).cancel(anyBoolean());
+ }
+
+ @Test
+ public void testAdd() {
+ // still running - this should not be invoked
+ mgr.add(future1);
+ verify(future1, never()).cancel(anyBoolean());
+
+ // re-add should have no impact
+ mgr.add(future1);
+ verify(future1, never()).cancel(anyBoolean());
+
+ mgr.stop();
+
+ verify(future1).cancel(anyBoolean());
+
+ // new additions should be invoked immediately
+ mgr.add(future2);
+ verify(future2).cancel(anyBoolean());
+
+ // should work with exceptions, too
+ when(future3.cancel(anyBoolean())).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
+ mgr.add(future3);
+ }
+
+ @Test
+ public void testRemove() {
+ mgr.add(future1);
+ mgr.add(future2);
+
+ verify(future1, never()).cancel(anyBoolean());
+ verify(future2, never()).cancel(anyBoolean());
+
+ // remove the second
+ mgr.remove(future2);
+
+ // should be able to remove it again
+ mgr.remove(future2);
+
+ mgr.stop();
+
+ // first should have run, but not the second
+ verify(future1).cancel(anyBoolean());
+
+ verify(future2, never()).cancel(anyBoolean());
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManagerTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManagerTest.java
new file mode 100644
index 000000000..4a882d422
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManagerTest.java
@@ -0,0 +1,134 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.pipeline;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+public class ListenerManagerTest {
+
+ private static final String EXPECTED_EXCEPTION = "expected exception";
+
+ @Mock
+ private Runnable runnable1;
+
+ @Mock
+ private Runnable runnable2;
+
+ @Mock
+ private Runnable runnable3;
+
+ private ListenerManager mgr;
+
+ /**
+ * Initializes fields, including {@link #mgr}.
+ */
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ mgr = new ListenerManager();
+ }
+
+ @Test
+ public void testStop_testIsRunning() {
+ mgr.add(runnable1);
+ mgr.add(runnable2);
+ mgr.add(runnable3);
+
+ // arrange for one to throw an exception
+ doThrow(new IllegalStateException(EXPECTED_EXCEPTION)).when(runnable2).run();
+
+ // nothing should have been canceled yet
+ verify(runnable1, never()).run();
+ verify(runnable2, never()).run();
+ verify(runnable3, never()).run();
+
+ assertTrue(mgr.isRunning());
+
+ // stop the controller
+ mgr.stop();
+
+ // all controllers should now be stopped
+ assertFalse(mgr.isRunning());
+
+ // everything should have been invoked
+ verify(runnable1).run();
+ verify(runnable2).run();
+ verify(runnable3).run();
+
+ // re-invoking stop should have no effect on the listeners
+ mgr.stop();
+
+ verify(runnable1).run();
+ verify(runnable2).run();
+ verify(runnable3).run();
+ }
+
+ @Test
+ public void testAdd() {
+ // still running - this should not be invoked
+ mgr.add(runnable1);
+ verify(runnable1, never()).run();
+
+ mgr.stop();
+
+ verify(runnable1).run();
+
+ // new additions should be invoked immediately
+ mgr.add(runnable2);
+ verify(runnable2).run();
+
+ // should work with exceptions, too
+ doThrow(new IllegalStateException(EXPECTED_EXCEPTION)).when(runnable3).run();
+ mgr.add(runnable3);
+ }
+
+ @Test
+ public void testRemove() {
+ mgr.add(runnable1);
+ mgr.add(runnable2);
+
+ verify(runnable1, never()).run();
+ verify(runnable2, never()).run();
+
+ // remove the second
+ mgr.remove(runnable2);
+
+ // should be able to remove it again
+ mgr.remove(runnable2);
+
+ mgr.stop();
+
+ // first should have run, but not the second
+ verify(runnable1).run();
+
+ verify(runnable2, never()).run();
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFutureTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFutureTest.java
new file mode 100644
index 000000000..b421c1ce2
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFutureTest.java
@@ -0,0 +1,254 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.pipeline;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+public class PipelineControllerFutureTest {
+ private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
+ private static final String TEXT = "some text";
+
+ @Mock
+ private Runnable runnable1;
+
+ @Mock
+ private Runnable runnable2;
+
+ @Mock
+ private Future<String> future1;
+
+ @Mock
+ private Future<String> future2;
+
+ @Mock
+ private CompletableFuture<String> compFuture;
+
+
+ private PipelineControllerFuture<String> controller;
+
+
+ /**
+ * Initializes fields, including {@link #controller}. Adds all runners and futures to
+ * the controller.
+ */
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ controller = new PipelineControllerFuture<>();
+
+ controller.add(runnable1);
+ controller.add(future1);
+ controller.add(runnable2);
+ controller.add(future2);
+ }
+
+ @Test
+ public void testCancel_testAddFutureOfFBoolean_testAddRunnable__testIsRunning() {
+ assertTrue(controller.isRunning());
+
+ assertTrue(controller.cancel(false));
+
+ assertTrue(controller.isCancelled());
+ assertFalse(controller.isRunning());
+
+ verify(runnable1).run();
+ verify(runnable2).run();
+ verify(future1).cancel(anyBoolean());
+ verify(future2).cancel(anyBoolean());
+
+ // re-invoke; nothing should change
+ assertTrue(controller.cancel(true));
+
+ assertTrue(controller.isCancelled());
+ assertFalse(controller.isRunning());
+
+ verify(runnable1).run();
+ verify(runnable2).run();
+ verify(future1).cancel(anyBoolean());
+ verify(future2).cancel(anyBoolean());
+ }
+
+ @Test
+ public void testDelayedComplete() throws Exception {
+ controller.add(runnable1);
+
+ BiConsumer<String, Throwable> stopper = controller.delayedComplete();
+
+ // shouldn't have run yet
+ assertTrue(controller.isRunning());
+ verify(runnable1, never()).run();
+
+ stopper.accept(TEXT, null);
+
+ assertTrue(controller.isDone());
+ assertEquals(TEXT, controller.get());
+
+ assertFalse(controller.isRunning());
+ verify(runnable1).run();
+
+ // re-invoke; nothing should change
+ stopper.accept(TEXT, EXPECTED_EXCEPTION);
+ assertFalse(controller.isCompletedExceptionally());
+
+ assertFalse(controller.isRunning());
+ verify(runnable1).run();
+ }
+
+ /**
+ * Tests delayedComplete() when an exception is generated.
+ */
+ @Test
+ public void testDelayedCompleteWithException() throws Exception {
+ controller.add(runnable1);
+
+ BiConsumer<String, Throwable> stopper = controller.delayedComplete();
+
+ // shouldn't have run yet
+ assertTrue(controller.isRunning());
+ verify(runnable1, never()).run();
+
+ stopper.accept(TEXT, EXPECTED_EXCEPTION);
+
+ assertTrue(controller.isDone());
+ assertThatThrownBy(() -> controller.get()).hasCause(EXPECTED_EXCEPTION);
+
+ assertFalse(controller.isRunning());
+ verify(runnable1).run();
+
+ // re-invoke; nothing should change
+ stopper.accept(TEXT, null);
+ assertTrue(controller.isCompletedExceptionally());
+
+ assertFalse(controller.isRunning());
+ verify(runnable1).run();
+ }
+
+ @Test
+ public void testDelayedRemoveFutureOfF() throws Exception {
+ BiConsumer<String, Throwable> remover = controller.delayedRemove(future1);
+
+ remover.accept(TEXT, EXPECTED_EXCEPTION);
+
+ // should not have completed the controller
+ assertFalse(controller.isDone());
+
+ verify(future1, never()).cancel(anyBoolean());
+
+ controller.delayedComplete().accept(TEXT, EXPECTED_EXCEPTION);
+
+ verify(future1, never()).cancel(anyBoolean());
+ verify(future2).cancel(anyBoolean());
+ }
+
+ @Test
+ public void testDelayedRemoveRunnable() throws Exception {
+ BiConsumer<String, Throwable> remover = controller.delayedRemove(runnable1);
+
+ remover.accept(TEXT, EXPECTED_EXCEPTION);
+
+ // should not have completed the controller
+ assertFalse(controller.isDone());
+
+ verify(runnable1, never()).run();
+
+ controller.delayedComplete().accept(TEXT, EXPECTED_EXCEPTION);
+
+ verify(runnable1, never()).run();
+ verify(runnable2).run();
+ }
+
+ @Test
+ public void testRemoveFutureOfF_testRemoveRunnable() {
+ controller.remove(runnable2);
+ controller.remove(future1);
+
+ controller.cancel(true);
+
+ verify(runnable1).run();
+ verify(runnable2, never()).run();
+ verify(future1, never()).cancel(anyBoolean());
+ verify(future2).cancel(anyBoolean());
+ }
+
+ @Test
+ public void testAddFunction() {
+ AtomicReference<String> value = new AtomicReference<>();
+
+ Function<String, CompletableFuture<String>> func = controller.add(input -> {
+ value.set(input);
+ return compFuture;
+ });
+
+ assertSame(compFuture, func.apply(TEXT));
+ assertEquals(TEXT, value.get());
+
+ verify(compFuture, never()).cancel(anyBoolean());
+
+ // should not have completed the controller
+ assertFalse(controller.isDone());
+
+ // cancel - should propagate
+ controller.cancel(false);
+
+ verify(compFuture).cancel(anyBoolean());
+ }
+
+ /**
+ * Tests add(Function) when the controller is not running.
+ */
+ @Test
+ public void testAddFunctionNotRunning() {
+ AtomicReference<String> value = new AtomicReference<>();
+
+ Function<String, CompletableFuture<String>> func = controller.add(input -> {
+ value.set(input);
+ return compFuture;
+ });
+
+ controller.cancel(false);
+
+ CompletableFuture<String> fut = func.apply(TEXT);
+ assertNotSame(compFuture, fut);
+ assertFalse(fut.isDone());
+
+ assertNull(value.get());
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml b/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..f8a1e5112
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ============LICENSE_START=======================================================
+ ONAP
+ ================================================================================
+ Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ ================================================================================
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ============LICENSE_END=========================================================
+-->
+
+<configuration>
+
+ <contextName>Actors</contextName>
+ <statusListener class="ch.qos.logback.core.status.OnConsoleStatusListener" />
+
+ <!-- USE FOR STD OUT ONLY -->
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <Pattern>%d %level %msg%n</Pattern>
+ </encoder>
+ </appender>
+
+ <root level="warn">
+ <appender-ref ref="STDOUT" />
+ </root>
+
+ <!-- this is just an example -->
+ <logger name="ch.qos.logback.classic" level="off" additivity="false">
+ <appender-ref ref="STDOUT" />
+ </logger>
+</configuration>