aboutsummaryrefslogtreecommitdiffstats
path: root/models-interactions/model-actors/actorServiceProvider/src/main
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2020-01-23 18:56:40 -0500
committerJim Hahn <jrh3@att.com>2020-02-03 16:38:39 -0500
commit89ce8bc6075096d015cdc8735634e0be14fe0357 (patch)
tree166892169d2a52ebdddafc0f2c8bba2308fdad0d /models-interactions/model-actors/actorServiceProvider/src/main
parentd789d26741b22fca83168ab209e517fbfbcefcc6 (diff)
Actor redesign.
Left original code intact so that it can continue to be used until everything has been converted to use the new approach. Simply added new methods and classes. (A few minor edits were required to the old code, e.g., added constructors to the Actor implementations). Code to be removed is annotated with "TODO". This only contains one revised actor, SDNC. This actor combines code from actor.sdnc, sdnc, and drools-applications. Coverage tests are incomplete, but I anticipate some simplification to this design in a couple of days; coverage will be added at that time. Issue-ID: POLICY-1625 Signed-off-by: Jim Hahn <jrh3@att.com> Change-Id: I4b75730e3621a9ee026ad10e557abe92df10dcf4
Diffstat (limited to 'models-interactions/model-actors/actorServiceProvider/src/main')
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java167
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/DelayedIdentString.java65
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java58
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java114
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java90
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java239
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java757
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/StartConfigPartial.java153
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java210
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java112
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java69
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ParameterValidationRuntimeException.java57
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java68
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/FutureManager.java89
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java115
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java157
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java52
17 files changed, 2546 insertions, 26 deletions
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java
index 809936146..13f09b1ad 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ActorService
* ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018, 2020 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2019 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,25 +21,56 @@
package org.onap.policy.controlloop.actorserviceprovider;
-import com.google.common.collect.ImmutableList;
-
-import java.util.Iterator;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import java.util.ServiceLoader;
-
+import java.util.Set;
+import org.onap.policy.common.parameters.BeanValidationResult;
+import org.onap.policy.controlloop.actorserviceprovider.impl.StartConfigPartial;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ActorService {
-
+/**
+ * Service that manages a set of actors. To use the service, first invoke
+ * {@link #configure(Map)} to configure all of the actors, and then invoke
+ * {@link #start()} to start all of the actors. When finished using the actor service,
+ * invoke {@link #stop()} or {@link #shutdown()}.
+ */
+public class ActorService extends StartConfigPartial<Map<String, Object>> {
private static final Logger logger = LoggerFactory.getLogger(ActorService.class);
- private static ActorService service;
- // USed to load actors
- private final ServiceLoader<Actor> loader;
+ private final Map<String, Actor> name2actor;
+
+ private static class LazyHolder {
+ static final ActorService INSTANCE = new ActorService();
+ }
+
+ /**
+ * Constructs the object and loads the list of actors.
+ */
+ protected ActorService() {
+ super("actors");
+
+ Map<String, Actor> map = new HashMap<>();
+
+ for (Actor newActor : loadActors()) {
+ map.compute(newActor.getName(), (name, existingActor) -> {
+ if (existingActor == null) {
+ return newActor;
+ }
+
+ // TODO: should this throw an exception?
+ logger.warn("duplicate actor names for {}: {}, ignoring {}", name,
+ existingActor.getClass().getSimpleName(), newActor.getClass().getSimpleName());
+ return existingActor;
+ });
+ }
- private ActorService() {
- loader = ServiceLoader.load(Actor.class);
+ name2actor = ImmutableMap.copyOf(map);
}
/**
@@ -47,27 +78,115 @@ public class ActorService {
*
* @return the instance
*/
- public static synchronized ActorService getInstance() {
- if (service == null) {
- service = new ActorService();
+ public static ActorService getInstance() {
+ return LazyHolder.INSTANCE;
+ }
+
+ /**
+ * Gets a particular actor.
+ *
+ * @param name name of the actor of interest
+ * @return the desired actor
+ * @throws IllegalArgumentException if no actor by the given name exists
+ */
+ public Actor getActor(String name) {
+ Actor actor = name2actor.get(name);
+ if (actor == null) {
+ throw new IllegalArgumentException("unknown actor " + name);
}
- return service;
+
+ return actor;
}
/**
- * Get the actors.
+ * Gets the actors.
*
* @return the actors
*/
- public ImmutableList<Actor> actors() {
- Iterator<Actor> iter = loader.iterator();
- logger.debug("returning actors");
- while (iter.hasNext()) {
- if (logger.isDebugEnabled()) {
- logger.debug("Got {}", iter.next().actor());
+ public Collection<Actor> getActors() {
+ return name2actor.values();
+ }
+
+ /**
+ * Gets the names of the actors.
+ *
+ * @return the actor names
+ */
+ public Set<String> getActorNames() {
+ return name2actor.keySet();
+ }
+
+ @Override
+ protected void doConfigure(Map<String, Object> parameters) {
+ logger.info("configuring actors");
+
+ BeanValidationResult valres = new BeanValidationResult("ActorService", parameters);
+
+ for (Actor actor : name2actor.values()) {
+ String actorName = actor.getName();
+ Map<String, Object> subparams = Util.translateToMap(actorName, parameters.get(actorName));
+
+ if (subparams != null) {
+
+ try {
+ actor.configure(subparams);
+
+ } catch (ParameterValidationRuntimeException e) {
+ logger.warn("failed to configure actor {}", actorName, e);
+ valres.addResult(e.getResult());
+
+ } catch (RuntimeException e) {
+ logger.warn("failed to configure actor {}", actorName, e);
+ }
+
+ } else if (actor.isConfigured()) {
+ logger.warn("missing configuration parameters for actor {}; using previous parameters", actorName);
+
+ } else {
+ logger.warn("missing configuration parameters for actor {}; actor cannot be started", actorName);
}
}
- return ImmutableList.copyOf(loader.iterator());
+ if (!valres.isValid() && logger.isWarnEnabled()) {
+ logger.warn("actor services validation errors:\n{}", valres.getResult());
+ }
+ }
+
+ @Override
+ protected void doStart() {
+ logger.info("starting actors");
+
+ for (Actor actor : name2actor.values()) {
+ if (actor.isConfigured()) {
+ Util.logException(actor::start, "failed to start actor {}", actor.getName());
+
+ } else {
+ logger.warn("not starting unconfigured actor {}", actor.getName());
+ }
+ }
+ }
+
+ @Override
+ protected void doStop() {
+ logger.info("stopping actors");
+ name2actor.values()
+ .forEach(actor -> Util.logException(actor::stop, "failed to stop actor {}", actor.getName()));
+ }
+
+ @Override
+ protected void doShutdown() {
+ logger.info("shutting down actors");
+
+ // @formatter:off
+ name2actor.values().forEach(
+ actor -> Util.logException(actor::shutdown, "failed to shutdown actor {}", actor.getName()));
+
+ // @formatter:on
+ }
+
+ // the following methods may be overridden by junit tests
+
+ protected Iterable<Actor> loadActors() {
+ return ServiceLoader.load(Actor.class);
}
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/DelayedIdentString.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/DelayedIdentString.java
new file mode 100644
index 000000000..b7a9a53ad
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/DelayedIdentString.java
@@ -0,0 +1,65 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider;
+
+import lombok.AllArgsConstructor;
+
+/**
+ * Object whose {@link #toString()} method invokes {@link Object#toString()} on another
+ * object, on-demand. This assumes that the other object's method returns an object
+ * identifier. This is typically used to include an object's identifier in a log message.
+ */
+@AllArgsConstructor
+public class DelayedIdentString {
+ /**
+ * String to return for null objects or null object identifiers.
+ */
+ public static final String NULL_STRING = "null";
+
+ private final Object object;
+
+ /**
+ * Gets the object's identifier, after stripping anything appearing before '@'.
+ */
+ @Override
+ public String toString() {
+ if (object == null) {
+ return NULL_STRING;
+ }
+
+ String ident = objectToString();
+ if (ident == null) {
+ return NULL_STRING;
+ }
+
+ int index = ident.indexOf('@');
+ return (index > 0 ? ident.substring(index) : ident);
+ }
+
+ /**
+ * Invokes the object's {@link Object#toString()} method.
+ *
+ * @return the output from the object's {@link Object#toString()} method
+ */
+ protected String objectToString() {
+ return object.toString();
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java
new file mode 100644
index 000000000..e308ee42e
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java
@@ -0,0 +1,58 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.onap.policy.common.capabilities.Configurable;
+import org.onap.policy.common.capabilities.Startable;
+import org.onap.policy.controlloop.ControlLoopOperation;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+
+/**
+ * This is the service interface for defining an Actor operation used in Control Loop
+ * Operational Policies for performing actions on runtime entities.
+ */
+public interface Operator extends Startable, Configurable<Map<String, Object>> {
+
+ /**
+ * Gets the name of the associated actor.
+ *
+ * @return the name of the associated actor
+ */
+ String getActorName();
+
+ /**
+ * Gets the name of the operation.
+ *
+ * @return the operation name
+ */
+ String getName();
+
+ /**
+ * Called by enforcement PDP engine to start the operation. As part of the operation,
+ * it invokes the "start" and "complete" call-backs found within the parameters.
+ *
+ * @param params parameters needed to start the operation
+ * @return a future that can be used to cancel or await the result of the operation
+ */
+ CompletableFuture<ControlLoopOperation> startOperation(ControlLoopOperationParams params);
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java
new file mode 100644
index 000000000..0aba1a7fa
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java
@@ -0,0 +1,114 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Actor utilities.
+ */
+public class Util {
+ private static final Logger logger = LoggerFactory.getLogger(Util.class);
+
+ private Util() {
+ // do nothing
+ }
+
+ /**
+ * Extracts an object's identity by invoking {@link Object#toString()} and returning
+ * the portion starting with "@". Extraction is done on-demand, when toString() is
+ * called on the result. This is typically used when logging.
+ *
+ * @param object object whose identity is to be extracted
+ * @return an object that will extract the source object's identity when this object's
+ * toString() method is called
+ */
+ public static Object ident(Object object) {
+ return new DelayedIdentString(object);
+ }
+
+ /**
+ * Runs a function and logs a message if it throws an exception. Does <i>not</i>
+ * re-throw the exception.
+ *
+ * @param function function to be run
+ * @param exceptionMessage message to log if an exception is thrown
+ * @param exceptionArgs arguments to be passed to the logger
+ */
+ public static void logException(Runnable function, String exceptionMessage, Object... exceptionArgs) {
+ try {
+ function.run();
+
+ } catch (RuntimeException ex) {
+ // create a new array containing the original arguments plus the exception
+ Object[] allArgs = Arrays.copyOf(exceptionArgs, exceptionArgs.length + 1);
+ allArgs[exceptionArgs.length] = ex;
+
+ logger.warn(exceptionMessage, allArgs);
+ }
+ }
+
+ /**
+ * Translates parameters from one class to another, typically from a Map to a POJO or
+ * vice versa.
+ *
+ * @param identifier identifier of the actor/operation being translated; used to build
+ * an exception message
+ * @param source source object to be translated
+ * @param clazz target class
+ * @return the translated object
+ */
+ public static <T> T translate(String identifier, Object source, Class<T> clazz) {
+ Coder coder = new StandardCoder();
+
+ try {
+ String json = coder.encode(source);
+ return coder.decode(json, clazz);
+
+ } catch (CoderException | RuntimeException e) {
+ throw new IllegalArgumentException("cannot translate parameters for " + identifier, e);
+ }
+ }
+
+ /**
+ * Translates parameters to a Map.
+ *
+ * @param identifier identifier of the actor/operation being translated; used to build
+ * an exception message
+ * @param source source parameters
+ * @return the parameters, as a Map
+ */
+ @SuppressWarnings("unchecked")
+ public static Map<String, Object> translateToMap(String identifier, Object source) {
+ if (source == null) {
+ return null;
+ }
+
+ return translate(identifier, source, LinkedHashMap.class);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java
new file mode 100644
index 000000000..68bbe7edc
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java
@@ -0,0 +1,90 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.controlloop;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.aai.AaiCqResponse;
+import org.onap.policy.controlloop.VirtualControlLoopEvent;
+
+/**
+ * Context associated with a control loop event.
+ */
+@Getter
+@Setter
+public class ControlLoopEventContext implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ @Setter(AccessLevel.NONE)
+ private final VirtualControlLoopEvent event;
+
+ private AaiCqResponse aaiCqResponse;
+
+ // TODO may remove this if it proves not to be needed
+ @Getter(AccessLevel.NONE)
+ @Setter(AccessLevel.NONE)
+ private Map<String, Serializable> properties = new ConcurrentHashMap<>();
+
+ /**
+ * Constructs the object.
+ *
+ * @param event event with which this is associated
+ */
+ public ControlLoopEventContext(VirtualControlLoopEvent event) {
+ this.event = event;
+ }
+
+ /**
+ * Determines if the context contains a property.
+ *
+ * @param name name of the property of interest
+ * @return {@code true} if the context contains the property, {@code false} otherwise
+ */
+ public boolean contains(String name) {
+ return properties.containsKey(name);
+ }
+
+ /**
+ * Gets a property, casting it to the desired type.
+ *
+ * @param <T> desired type
+ * @param name name of the property whose value is to be retrieved
+ * @return the property's value, or {@code null} if it does not yet have a value
+ */
+ @SuppressWarnings("unchecked")
+ public <T> T getProperty(String name) {
+ return (T) properties.get(name);
+ }
+
+ /**
+ * Sets a property's value.
+ *
+ * @param name property name
+ * @param value new property value
+ */
+ public void setProperty(String name, Serializable value) {
+ properties.put(name, value);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java
new file mode 100644
index 000000000..9b9aa914e
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java
@@ -0,0 +1,239 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.onap.policy.common.parameters.BeanValidationResult;
+import org.onap.policy.controlloop.actorserviceprovider.Operator;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
+import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of an actor.
+ */
+public class ActorImpl extends StartConfigPartial<Map<String, Object>> implements Actor {
+ private static final Logger logger = LoggerFactory.getLogger(ActorImpl.class);
+
+ /**
+ * Maps a name to an operator.
+ */
+ private Map<String, Operator> name2operator;
+
+ /**
+ * Constructs the object.
+ *
+ * @param name actor name
+ * @param operators the operations supported by this actor
+ */
+ public ActorImpl(String name, Operator... operators) {
+ super(name);
+ setOperators(Arrays.asList(operators));
+ }
+
+ /**
+ * Sets the operators supported by this actor, overriding any previous list.
+ *
+ * @param operators the operations supported by this actor
+ */
+ protected void setOperators(List<Operator> operators) {
+ if (isConfigured()) {
+ throw new IllegalStateException("attempt to set operators on a configured actor: " + getName());
+ }
+
+ Map<String, Operator> map = new HashMap<>();
+ for (Operator newOp : operators) {
+ map.compute(newOp.getName(), (opName, existingOp) -> {
+ if (existingOp == null) {
+ return newOp;
+ }
+
+ // TODO: should this throw an exception?
+ logger.warn("duplicate names for actor operation {}.{}: {}, ignoring {}", getName(), opName,
+ existingOp.getClass().getSimpleName(), newOp.getClass().getSimpleName());
+ return existingOp;
+ });
+ }
+
+ this.name2operator = ImmutableMap.copyOf(map);
+ }
+
+ @Override
+ public String getName() {
+ return getFullName();
+ }
+
+ @Override
+ public Operator getOperator(String name) {
+ Operator operator = name2operator.get(name);
+ if (operator == null) {
+ throw new IllegalArgumentException("unknown operation " + getName() + "." + name);
+ }
+
+ return operator;
+ }
+
+ @Override
+ public Collection<Operator> getOperators() {
+ return name2operator.values();
+ }
+
+ @Override
+ public Set<String> getOperationNames() {
+ return name2operator.keySet();
+ }
+
+ /**
+ * For each operation, it looks for a set of parameters by the same name and, if
+ * found, configures the operation with the parameters.
+ */
+ @Override
+ protected void doConfigure(Map<String, Object> parameters) {
+ final String actorName = getName();
+ logger.info("configuring operations for actor {}", actorName);
+
+ BeanValidationResult valres = new BeanValidationResult(actorName, parameters);
+
+ // function that creates operator-specific parameters, given the operation name
+ Function<String, Map<String, Object>> opParamsMaker = makeOperatorParameters(parameters);
+
+ for (Operator operator : name2operator.values()) {
+ String operName = operator.getName();
+ Map<String, Object> subparams = opParamsMaker.apply(operName);
+
+ if (subparams != null) {
+
+ try {
+ operator.configure(subparams);
+
+ } catch (ParameterValidationRuntimeException e) {
+ logger.warn("failed to configure operation {}.{}", actorName, operName, e);
+ valres.addResult(e.getResult());
+
+ } catch (RuntimeException e) {
+ logger.warn("failed to configure operation {}.{}", actorName, operName, e);
+ }
+
+ } else if (operator.isConfigured()) {
+ logger.warn("missing configuration parameters for operation {}.{}; using previous parameters",
+ actorName, operName);
+
+ } else {
+ logger.warn("missing configuration parameters for operation {}.{}; operation cannot be started",
+ actorName, operName);
+ }
+ }
+ }
+
+ /**
+ * Extracts the operator parameters from the actor parameters, for a given operator.
+ * This method assumes each operation has its own set of parameters.
+ *
+ * @param actorParameters actor parameters
+ * @return a function to extract the operator parameters from the actor parameters.
+ * Note: this function may return {@code null} if there are no parameters for
+ * the given operation name
+ */
+ protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) {
+
+ return operName -> Util.translateToMap(getName() + "." + operName, actorParameters.get(operName));
+ }
+
+ /**
+ * Starts each operation.
+ */
+ @Override
+ protected void doStart() {
+ final String actorName = getName();
+ logger.info("starting operations for actor {}", actorName);
+
+ for (Operator oper : name2operator.values()) {
+ if (oper.isConfigured()) {
+ Util.logException(oper::start, "failed to start operation {}.{}", actorName, oper.getName());
+
+ } else {
+ logger.warn("not starting unconfigured operation {}.{}", actorName, oper.getName());
+ }
+ }
+ }
+
+ /**
+ * Stops each operation.
+ */
+ @Override
+ protected void doStop() {
+ final String actorName = getName();
+ logger.info("stopping operations for actor {}", actorName);
+
+ // @formatter:off
+ name2operator.values().forEach(
+ oper -> Util.logException(oper::stop, "failed to stop operation {}.{}", actorName, oper.getName()));
+ // @formatter:on
+ }
+
+ /**
+ * Shuts down each operation.
+ */
+ @Override
+ protected void doShutdown() {
+ final String actorName = getName();
+ logger.info("shutting down operations for actor {}", actorName);
+
+ // @formatter:off
+ name2operator.values().forEach(oper -> Util.logException(oper::shutdown,
+ "failed to shutdown operation {}.{}", actorName, oper.getName()));
+ // @formatter:on
+ }
+
+ // TODO old code: remove lines down to **HERE**
+
+ @Override
+ public String actor() {
+ return null;
+ }
+
+ @Override
+ public List<String> recipes() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<String> recipeTargets(String recipe) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<String> recipePayloads(String recipe) {
+ return Collections.emptyList();
+ }
+
+ // **HERE**
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java
new file mode 100644
index 000000000..80d8fbd04
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java
@@ -0,0 +1,757 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import lombok.Getter;
+import org.onap.policy.controlloop.ControlLoopOperation;
+import org.onap.policy.controlloop.actorserviceprovider.Operator;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
+import org.onap.policy.controlloop.policy.Policy;
+import org.onap.policy.controlloop.policy.PolicyResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Partial implementation of an operator. Subclasses can choose to simply implement
+ * {@link #doOperation(ControlLoopOperationParams)}, or they may choose to override
+ * {@link #doOperationAsFuture(ControlLoopOperationParams)}.
+ */
+public abstract class OperatorPartial extends StartConfigPartial<Map<String, Object>> implements Operator {
+
+ private static final Logger logger = LoggerFactory.getLogger(OperatorPartial.class);
+
+ private static final String OUTCOME_SUCCESS = PolicyResult.SUCCESS.toString();
+ private static final String OUTCOME_FAILURE = PolicyResult.FAILURE.toString();
+ private static final String OUTCOME_RETRIES = PolicyResult.FAILURE_RETRIES.toString();
+
+ @Getter
+ private final String actorName;
+
+ @Getter
+ private final String name;
+
+ /**
+ * Constructs the object.
+ *
+ * @param actorName name of the actor with which this operator is associated
+ * @param name operation name
+ */
+ public OperatorPartial(String actorName, String name) {
+ super(actorName + "." + name);
+ this.actorName = actorName;
+ this.name = name;
+ }
+
+ /**
+ * This method does nothing.
+ */
+ @Override
+ protected void doConfigure(Map<String, Object> parameters) {
+ // do nothing
+ }
+
+ /**
+ * This method does nothing.
+ */
+ @Override
+ protected void doStart() {
+ // do nothing
+ }
+
+ /**
+ * This method does nothing.
+ */
+ @Override
+ protected void doStop() {
+ // do nothing
+ }
+
+ /**
+ * This method does nothing.
+ */
+ @Override
+ protected void doShutdown() {
+ // do nothing
+ }
+
+ @Override
+ public final CompletableFuture<ControlLoopOperation> startOperation(ControlLoopOperationParams params) {
+ if (!isAlive()) {
+ throw new IllegalStateException("operation is not running: " + getFullName());
+ }
+
+ final Executor executor = params.getExecutor();
+
+ // allocate a controller for the entire operation
+ final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
+
+ CompletableFuture<ControlLoopOperation> preproc = startPreprocessor(params);
+ if (preproc == null) {
+ // no preprocessor required - just start the operation
+ return startOperationAttempt(params, controller, 1);
+ }
+
+ // propagate "stop" to the preprocessor
+ controller.add(preproc);
+
+ /*
+ * Do preprocessor first and then, if successful, start the operation. Note:
+ * operations create their own outcome, ignoring the outcome from any previous
+ * steps.
+ */
+ preproc.whenCompleteAsync(controller.delayedRemove(preproc), executor)
+ .thenComposeAsync(handleFailure(params, controller), executor)
+ .thenComposeAsync(onSuccess(params, unused -> startOperationAttempt(params, controller, 1)),
+ executor);
+
+ return controller;
+ }
+
+ /**
+ * Starts an operation's preprocessor step(s). If the preprocessor fails, then it
+ * invokes the started and completed call-backs.
+ *
+ * @param params operation parameters
+ * @return a future that will return the preprocessor outcome, or {@code null} if this
+ * operation needs no preprocessor
+ */
+ protected CompletableFuture<ControlLoopOperation> startPreprocessor(ControlLoopOperationParams params) {
+ logger.info("{}: start low-level operation preprocessor for {}", getFullName(), params.getRequestId());
+
+ final Executor executor = params.getExecutor();
+ final ControlLoopOperation operation = params.makeOutcome();
+
+ final Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preproc =
+ doPreprocessorAsFuture(params);
+ if (preproc == null) {
+ // no preprocessor required
+ return null;
+ }
+
+ // allocate a controller for the preprocessor steps
+ final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
+
+ /*
+ * Don't mark it complete until we've built the whole pipeline. This will prevent
+ * the operation from starting until after it has been successfully built (i.e.,
+ * without generating any exceptions).
+ */
+ final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>();
+
+ // @formatter:off
+ firstFuture
+ .thenComposeAsync(controller.add(preproc), executor)
+ .exceptionally(fromException(params, operation))
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+ // @formatter:on
+
+ // start the pipeline
+ firstFuture.complete(operation);
+
+ return controller;
+ }
+
+ /**
+ * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
+ * invokes the call-backs and returns a failed outcome. Otherwise, it returns the
+ * outcome that it received.
+ *
+ * @param params operation parameters
+ * @param controller pipeline controller
+ * @return a function that checks the outcome status and continues, if successful, or
+ * indicates a failure otherwise
+ */
+ private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> handleFailure(
+ ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller) {
+
+ return outcome -> {
+
+ if (outcome != null && isSuccess(outcome)) {
+ logger.trace("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
+ return CompletableFuture.completedFuture(outcome);
+ }
+
+ logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
+
+ final Executor executor = params.getExecutor();
+ final CallbackManager callbacks = new CallbackManager();
+
+ // propagate "stop" to the callbacks
+ controller.add(callbacks);
+
+ final ControlLoopOperation outcome2 = params.makeOutcome();
+
+ // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
+
+ outcome2.setOutcome(PolicyResult.FAILURE_GUARD.toString());
+ outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
+
+ CompletableFuture.completedFuture(outcome2).thenApplyAsync(callbackStarted(params, callbacks), executor)
+ .thenApplyAsync(callbackCompleted(params, callbacks), executor)
+ .whenCompleteAsync(controller.delayedRemove(callbacks), executor)
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+
+ return controller;
+ };
+ }
+
+ /**
+ * Invokes the operation's preprocessor step(s) as a "future". This method simply
+ * returns {@code null}.
+ * <p/>
+ * This method assumes the following:
+ * <ul>
+ * <li>the operator is alive</li>
+ * <li>exceptions generated within the pipeline will be handled by the invoker</li>
+ * </ul>
+ *
+ * @param params operation parameters
+ * @return a function that will start the preprocessor and returns its outcome, or
+ * {@code null} if this operation needs no preprocessor
+ */
+ protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture(
+ ControlLoopOperationParams params) {
+ return null;
+ }
+
+ /**
+ * Starts the operation attempt, with no preprocessor. When all retries complete, it
+ * will complete the controller.
+ *
+ * @param params operation parameters
+ * @param controller controller for all operation attempts
+ * @param attempt attempt number, typically starting with 1
+ * @return a future that will return the final result of all attempts
+ */
+ private CompletableFuture<ControlLoopOperation> startOperationAttempt(ControlLoopOperationParams params,
+ PipelineControllerFuture<ControlLoopOperation> controller, int attempt) {
+
+ final Executor executor = params.getExecutor();
+
+ CompletableFuture<ControlLoopOperation> future = startAttemptWithoutRetries(params, attempt);
+
+ // propagate "stop" to the operation attempt
+ controller.add(future);
+
+ // detach when complete
+ future.whenCompleteAsync(controller.delayedRemove(future), executor)
+ .thenComposeAsync(retryOnFailure(params, controller, attempt), params.getExecutor())
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+
+ return controller;
+ }
+
+ /**
+ * Starts the operation attempt, without doing any retries.
+ *
+ * @param params operation parameters
+ * @param attempt attempt number, typically starting with 1
+ * @return a future that will return the result of a single operation attempt
+ */
+ private CompletableFuture<ControlLoopOperation> startAttemptWithoutRetries(ControlLoopOperationParams params,
+ int attempt) {
+
+ logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
+
+ final Executor executor = params.getExecutor();
+ final ControlLoopOperation outcome = params.makeOutcome();
+ final CallbackManager callbacks = new CallbackManager();
+
+ // this operation attempt gets its own controller
+ final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
+
+ // propagate "stop" to the callbacks
+ controller.add(callbacks);
+
+ /*
+ * Don't mark it complete until we've built the whole pipeline. This will prevent
+ * the operation from starting until after it has been successfully built (i.e.,
+ * without generating any exceptions).
+ */
+ final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>();
+
+ // @formatter:off
+ CompletableFuture<ControlLoopOperation> future2 =
+ firstFuture.thenComposeAsync(verifyRunning(controller, params), executor)
+ .thenApplyAsync(callbackStarted(params, callbacks), executor)
+ .thenComposeAsync(controller.add(doOperationAsFuture(params, attempt)), executor);
+ // @formatter:on
+
+ // handle timeouts, if specified
+ long timeoutMillis = getTimeOutMillis(params.getPolicy());
+ if (timeoutMillis > 0) {
+ logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
+ future2 = future2.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
+ }
+
+ /*
+ * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
+ * before callbackCompleted() is invoked.
+ *
+ * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
+ * the pipeline as the last step anyway.
+ */
+
+ // @formatter:off
+ future2.exceptionally(fromException(params, outcome))
+ .thenApplyAsync(setRetryFlag(params, attempt), executor)
+ .thenApplyAsync(callbackStarted(params, callbacks), executor)
+ .thenApplyAsync(callbackCompleted(params, callbacks), executor)
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+ // @formatter:on
+
+ // start the pipeline
+ firstFuture.complete(outcome);
+
+ return controller;
+ }
+
+ /**
+ * Determines if the outcome was successful.
+ *
+ * @param outcome outcome to examine
+ * @return {@code true} if the outcome was successful
+ */
+ protected boolean isSuccess(ControlLoopOperation outcome) {
+ return OUTCOME_SUCCESS.equals(outcome.getOutcome());
+ }
+
+ /**
+ * Determines if the outcome was a failure for this operator.
+ *
+ * @param outcome outcome to examine, or {@code null}
+ * @return {@code true} if the outcome is not {@code null} and was a failure
+ * <i>and</i> was associated with this operator, {@code false} otherwise
+ */
+ protected boolean isActorFailed(ControlLoopOperation outcome) {
+ return OUTCOME_FAILURE.equals(getActorOutcome(outcome));
+ }
+
+ /**
+ * Invokes the operation as a "future". This method simply invokes
+ * {@link #doOperation(ControlLoopOperationParams)} turning it into a "future".
+ * <p/>
+ * This method assumes the following:
+ * <ul>
+ * <li>the operator is alive</li>
+ * <li>verifyRunning() has been invoked</li>
+ * <li>callbackStarted() has been invoked</li>
+ * <li>the invoker will perform appropriate timeout checks</li>
+ * <li>exceptions generated within the pipeline will be handled by the invoker</li>
+ * </ul>
+ *
+ * @param params operation parameters
+ * @param attempt attempt number, typically starting with 1
+ * @return a function that will start the operation and return its result when
+ * complete
+ */
+ protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture(
+ ControlLoopOperationParams params, int attempt) {
+
+ /*
+ * TODO As doOperation() may perform blocking I/O, this should be launched in its
+ * own thread to prevent the ForkJoinPool from being tied up. Should probably
+ * provide a method to make that easy.
+ */
+
+ return operation -> CompletableFuture.supplyAsync(() -> doOperation(params, attempt, operation),
+ params.getExecutor());
+ }
+
+ /**
+ * Low-level method that performs the operation. This can make the same assumptions
+ * that are made by {@link #doOperationAsFuture(ControlLoopOperationParams)}. This
+ * method throws an {@link UnsupportedOperationException}.
+ *
+ * @param params operation parameters
+ * @param attempt attempt number, typically starting with 1
+ * @param operation the operation being performed
+ * @return the outcome of the operation
+ */
+ protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt,
+ ControlLoopOperation operation) {
+
+ throw new UnsupportedOperationException("start operation " + getFullName());
+ }
+
+ /**
+ * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
+ * FAILURE, assuming the policy specifies retries and the retry count has been
+ * exhausted.
+ *
+ * @param params operation parameters
+ * @param attempt latest attempt number, starting with 1
+ * @return a function to get the next future to execute
+ */
+ private Function<ControlLoopOperation, ControlLoopOperation> setRetryFlag(ControlLoopOperationParams params,
+ int attempt) {
+
+ return operation -> {
+ if (operation != null && !isActorFailed(operation)) {
+ /*
+ * wrong type or wrong operation - just leave it as is. No need to log
+ * anything here, as retryOnFailure() will log a message
+ */
+ return operation;
+ }
+
+ // get a non-null operation
+ ControlLoopOperation oper2;
+ if (operation != null) {
+ oper2 = operation;
+ } else {
+ oper2 = params.makeOutcome();
+ oper2.setOutcome(OUTCOME_FAILURE);
+ }
+
+ if (params.getPolicy().getRetry() != null && params.getPolicy().getRetry() > 0
+ && attempt > params.getPolicy().getRetry()) {
+ /*
+ * retries were specified and we've already tried them all - change to
+ * FAILURE_RETRIES
+ */
+ logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
+ oper2.setOutcome(OUTCOME_RETRIES);
+ }
+
+ return oper2;
+ };
+ }
+
+ /**
+ * Restarts the operation if it was a FAILURE. Assumes that
+ * {@link #setRetryFlag(ControlLoopOperationParams, int)} was previously invoked, and
+ * thus that the "operation" is not {@code null}.
+ *
+ * @param params operation parameters
+ * @param controller controller for all of the retries
+ * @param attempt latest attempt number, starting with 1
+ * @return a function to get the next future to execute
+ */
+ private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> retryOnFailure(
+ ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller,
+ int attempt) {
+
+ return operation -> {
+ if (!isActorFailed(operation)) {
+ // wrong type or wrong operation - just leave it as is
+ logger.trace("not retrying operation {} for {}", getFullName(), params.getRequestId());
+ return CompletableFuture.completedFuture(operation);
+ }
+
+ if (params.getPolicy().getRetry() == null || params.getPolicy().getRetry() <= 0) {
+ // no retries - already marked as FAILURE, so just return it
+ logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
+ return CompletableFuture.completedFuture(operation);
+ }
+
+
+ /*
+ * Retry the operation.
+ */
+ logger.info("retry operation {} for {}", getFullName(), params.getRequestId());
+
+ return startOperationAttempt(params, controller, attempt + 1);
+ };
+ }
+
+ /**
+ * Gets the outcome of an operation for this operation.
+ *
+ * @param operation operation whose outcome is to be extracted
+ * @return the outcome of the given operation, if it's for this operator, {@code null}
+ * otherwise
+ */
+ protected String getActorOutcome(ControlLoopOperation operation) {
+ if (operation == null) {
+ return null;
+ }
+
+ if (!getActorName().equals(operation.getActor())) {
+ return null;
+ }
+
+ if (!getName().equals(operation.getOperation())) {
+ return null;
+ }
+
+ return operation.getOutcome();
+ }
+
+ /**
+ * Gets a function that will start the next step, if the current operation was
+ * successful, or just return the current operation, otherwise.
+ *
+ * @param params operation parameters
+ * @param nextStep function that will invoke the next step, passing it the operation
+ * @return a function that will start the next step
+ */
+ protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> onSuccess(
+ ControlLoopOperationParams params,
+ Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> nextStep) {
+
+ return operation -> {
+
+ if (operation == null) {
+ logger.trace("{}: null outcome - discarding next task for {}", getFullName(), params.getRequestId());
+ ControlLoopOperation outcome = params.makeOutcome();
+ outcome.setOutcome(OUTCOME_FAILURE);
+ return CompletableFuture.completedFuture(outcome);
+
+ } else if (isSuccess(operation)) {
+ logger.trace("{}: success - starting next task for {}", getFullName(), params.getRequestId());
+ return nextStep.apply(operation);
+
+ } else {
+ logger.trace("{}: failure - discarding next task for {}", getFullName(), params.getRequestId());
+ return CompletableFuture.completedFuture(operation);
+ }
+ };
+ }
+
+ /**
+ * Converts an exception into an operation outcome, returning a copy of the outcome to
+ * prevent background jobs from changing it.
+ *
+ * @param params operation parameters
+ * @param operation current operation
+ * @return a function that will convert an exception into an operation outcome
+ */
+ private Function<Throwable, ControlLoopOperation> fromException(ControlLoopOperationParams params,
+ ControlLoopOperation operation) {
+
+ return thrown -> {
+ logger.warn("exception throw by operation {}.{} for {}", operation.getActor(), operation.getOperation(),
+ params.getRequestId(), thrown);
+
+ /*
+ * Must make a copy of the operation, as the original could be changed by
+ * background jobs that might still be running.
+ */
+ return setOutcome(params, new ControlLoopOperation(operation), thrown);
+ };
+ }
+
+ /**
+ * Gets a function to verify that the operation is still running. If the pipeline is
+ * not running, then it returns an incomplete future, which will effectively halt
+ * subsequent operations in the pipeline. This method is intended to be used with one
+ * of the {@link CompletableFuture}'s <i>thenCompose()</i> methods.
+ *
+ * @param controller pipeline controller
+ * @param params operation parameters
+ * @return a function to verify that the operation is still running
+ */
+ protected <T> Function<T, CompletableFuture<T>> verifyRunning(
+ PipelineControllerFuture<ControlLoopOperation> controller, ControlLoopOperationParams params) {
+
+ return value -> {
+ boolean running = controller.isRunning();
+ logger.trace("{}: verify running {} for {}", getFullName(), running, params.getRequestId());
+
+ return (running ? CompletableFuture.completedFuture(value) : new CompletableFuture<>());
+ };
+ }
+
+ /**
+ * Sets the start time of the operation and invokes the callback to indicate that the
+ * operation has started. Does nothing if the pipeline has been stopped.
+ * <p/>
+ * This assumes that the "outcome" is not {@code null}.
+ *
+ * @param params operation parameters
+ * @param callbacks used to determine if the start callback can be invoked
+ * @return a function that sets the start time and invokes the callback
+ */
+ private Function<ControlLoopOperation, ControlLoopOperation> callbackStarted(ControlLoopOperationParams params,
+ CallbackManager callbacks) {
+
+ return outcome -> {
+
+ if (callbacks.canStart()) {
+ // haven't invoked "start" callback yet
+ outcome.setStart(callbacks.getStartTime());
+ outcome.setEnd(null);
+ params.callbackStarted(outcome);
+ }
+
+ return outcome;
+ };
+ }
+
+ /**
+ * Sets the end time of the operation and invokes the callback to indicate that the
+ * operation has completed. Does nothing if the pipeline has been stopped.
+ * <p/>
+ * This assumes that the "outcome" is not {@code null}.
+ * <p/>
+ * Note: the start time must be a reference rather than a plain value, because it's
+ * value must be gotten on-demand, when the returned function is executed at a later
+ * time.
+ *
+ * @param params operation parameters
+ * @param callbacks used to determine if the end callback can be invoked
+ * @return a function that sets the end time and invokes the callback
+ */
+ private Function<ControlLoopOperation, ControlLoopOperation> callbackCompleted(ControlLoopOperationParams params,
+ CallbackManager callbacks) {
+
+ return operation -> {
+
+ if (callbacks.canEnd()) {
+ operation.setStart(callbacks.getStartTime());
+ operation.setEnd(callbacks.getEndTime());
+ params.callbackCompleted(operation);
+ }
+
+ return operation;
+ };
+ }
+
+ /**
+ * Sets an operation's outcome and message, based on a throwable.
+ *
+ * @param params operation parameters
+ * @param operation operation to be updated
+ * @return the updated operation
+ */
+ protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation,
+ Throwable thrown) {
+ PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
+ return setOutcome(params, operation, result);
+ }
+
+ /**
+ * Sets an operation's outcome and default message based on the result.
+ *
+ * @param params operation parameters
+ * @param operation operation to be updated
+ * @param result result of the operation
+ * @return the updated operation
+ */
+ protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation,
+ PolicyResult result) {
+ logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
+ operation.setOutcome(result.toString());
+ operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
+ : ControlLoopOperation.FAILED_MSG);
+
+ return operation;
+ }
+
+ /**
+ * Determines if a throwable is due to a timeout.
+ *
+ * @param thrown throwable of interest
+ * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
+ */
+ protected boolean isTimeout(Throwable thrown) {
+ if (thrown instanceof CompletionException) {
+ thrown = thrown.getCause();
+ }
+
+ return (thrown instanceof TimeoutException);
+ }
+
+ // these may be overridden by junit tests
+
+ /**
+ * Gets the operation timeout. Subclasses may override this method to obtain the
+ * timeout in some other way (e.g., through configuration properties).
+ *
+ * @param policy policy from which to extract the timeout
+ * @return the operation timeout, in milliseconds
+ */
+ protected long getTimeOutMillis(Policy policy) {
+ Integer timeoutSec = policy.getTimeout();
+ return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
+ }
+
+ /**
+ * Manager for "start" and "end" callbacks.
+ */
+ private static class CallbackManager implements Runnable {
+ private final AtomicReference<Instant> startTime = new AtomicReference<>();
+ private final AtomicReference<Instant> endTime = new AtomicReference<>();
+
+ /**
+ * Determines if the "start" callback can be invoked. If so, it sets the
+ * {@link #startTime} to the current time.
+ *
+ * @return {@code true} if the "start" callback can be invoked, {@code false}
+ * otherwise
+ */
+ public boolean canStart() {
+ return startTime.compareAndSet(null, Instant.now());
+ }
+
+ /**
+ * Determines if the "end" callback can be invoked. If so, it sets the
+ * {@link #endTime} to the current time.
+ *
+ * @return {@code true} if the "end" callback can be invoked, {@code false}
+ * otherwise
+ */
+ public boolean canEnd() {
+ return endTime.compareAndSet(null, Instant.now());
+ }
+
+ /**
+ * Gets the start time.
+ *
+ * @return the start time, or {@code null} if {@link #canStart()} has not been
+ * invoked yet.
+ */
+ public Instant getStartTime() {
+ return startTime.get();
+ }
+
+ /**
+ * Gets the end time.
+ *
+ * @return the end time, or {@code null} if {@link #canEnd()} has not been invoked
+ * yet.
+ */
+ public Instant getEndTime() {
+ return endTime.get();
+ }
+
+ /**
+ * Prevents further callbacks from being executed by setting {@link #startTime}
+ * and {@link #endTime}.
+ */
+ @Override
+ public void run() {
+ canStart();
+ canEnd();
+ }
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/StartConfigPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/StartConfigPartial.java
new file mode 100644
index 000000000..6c883f1b5
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/StartConfigPartial.java
@@ -0,0 +1,153 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import lombok.Getter;
+import org.onap.policy.common.capabilities.Configurable;
+import org.onap.policy.common.capabilities.Startable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Partial implementation of an object that is both startable and configurable. It
+ * provides the high level methods defined in the interface, while deferring the details
+ * to abstract methods that must be provided by the subclasses. It also manages the
+ * current {@link #state}.
+ *
+ * @param <T> type of parameters expected by {@link #configure(Object)}
+ */
+public abstract class StartConfigPartial<T> implements Startable, Configurable<T> {
+ private static final Logger logger = LoggerFactory.getLogger(StartConfigPartial.class);
+
+ @Getter
+ private final String fullName;
+
+ public enum State {
+ IDLE, CONFIGURED, ALIVE
+ }
+
+ private State state = State.IDLE;
+
+ /**
+ * Constructs the object.
+ *
+ * @param fullName full name of this object, used for logging and exception purposes
+ */
+ public StartConfigPartial(String fullName) {
+ this.fullName = fullName;
+ }
+
+ @Override
+ public synchronized boolean isAlive() {
+ return (state == State.ALIVE);
+ }
+
+ /**
+ * Determines if this object has been configured.
+ *
+ * @return {@code true} if this object has been configured, {@code false} otherwise
+ */
+ public synchronized boolean isConfigured() {
+ return (state != State.IDLE);
+ }
+
+ @Override
+ public synchronized void configure(T parameters) {
+ if (isAlive()) {
+ throw new IllegalStateException("attempt to reconfigure, but already running " + getFullName());
+ }
+
+ logger.info("initializing {}", getFullName());
+
+ doConfigure(parameters);
+
+ state = State.CONFIGURED;
+ }
+
+ @Override
+ public synchronized boolean start() {
+ switch (state) {
+ case ALIVE:
+ logger.info("{} is already running", getFullName());
+ break;
+
+ case CONFIGURED:
+ logger.info("starting {}", getFullName());
+ doStart();
+ state = State.ALIVE;
+ break;
+
+ case IDLE:
+ default:
+ throw new IllegalStateException("attempt to start unconfigured " + getFullName());
+ }
+
+ return true;
+ }
+
+ @Override
+ public synchronized boolean stop() {
+ if (isAlive()) {
+ logger.info("stopping {}", getFullName());
+ state = State.CONFIGURED;
+ doStop();
+
+ } else {
+ logger.info("{} is not running", getFullName());
+ }
+
+ return true;
+ }
+
+ @Override
+ public synchronized void shutdown() {
+ if (!isAlive()) {
+ logger.info("{} is not running", getFullName());
+ return;
+ }
+
+ logger.info("shutting down actor {}", getFullName());
+ state = State.CONFIGURED;
+ doShutdown();
+ }
+
+ /**
+ * Configures this object.
+ *
+ * @param parameters configuration parameters
+ */
+ protected abstract void doConfigure(T parameters);
+
+ /**
+ * Starts this object.
+ */
+ protected abstract void doStart();
+
+ /**
+ * Stops this object.
+ */
+ protected abstract void doStop();
+
+ /**
+ * Shuts down this object.
+ */
+ protected abstract void doShutdown();
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java
new file mode 100644
index 000000000..08aba81f2
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java
@@ -0,0 +1,210 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Consumer;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.onap.policy.common.parameters.BeanValidationResult;
+import org.onap.policy.common.parameters.BeanValidator;
+import org.onap.policy.common.parameters.annotations.NotNull;
+import org.onap.policy.controlloop.ControlLoopOperation;
+import org.onap.policy.controlloop.actorserviceprovider.ActorService;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
+import org.onap.policy.controlloop.policy.Policy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Parameters for control loop operations. The executor defaults to
+ * {@link ForkJoinPool#commonPool()}, but may be overridden.
+ */
+@Getter
+@Builder(toBuilder = true)
+@AllArgsConstructor
+@EqualsAndHashCode
+public class ControlLoopOperationParams {
+
+ private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationParams.class);
+
+ public static final String UNKNOWN = "-unknown-";
+
+
+ /**
+ * The actor service in which to find the actor/operation.
+ */
+ @NotNull
+ private ActorService actorService;
+
+ /**
+ * The event for which the operation applies.
+ */
+ @NotNull
+ private ControlLoopEventContext context;
+
+ /**
+ * The executor to use to run the operation.
+ */
+ @NotNull
+ @Builder.Default
+ private Executor executor = ForkJoinPool.commonPool();
+
+ /**
+ * The policy associated with the operation.
+ */
+ @NotNull
+ private Policy policy;
+
+ /**
+ * The function to invoke when the operation starts. This is optional.
+ * <p/>
+ * Note: this may be invoked multiple times, but with different actor/operations. That
+ * may happen if the current operation requires other operations to be performed first
+ * (e.g., A&AI queries, guard checks).
+ */
+ private Consumer<ControlLoopOperation> startCallback;
+
+ /**
+ * The function to invoke when the operation completes. This is optional.
+ * <p/>
+ * Note: this may be invoked multiple times, but with different actor/operations. That
+ * may happen if the current operation requires other operations to be performed first
+ * (e.g., A&AI queries, guard checks).
+ */
+ private Consumer<ControlLoopOperation> completeCallback;
+
+ /**
+ * Target entity.
+ */
+ @NotNull
+ private String target;
+
+ /**
+ * Starts the specified operation.
+ *
+ * @return a future that will return the result of the operation
+ * @throws IllegalArgumentException if the parameters are invalid
+ */
+ public CompletableFuture<ControlLoopOperation> start() {
+ BeanValidationResult result = validate();
+ if (!result.isValid()) {
+ logger.warn("parameter error in operation {}.{} for {}:\n{}", getActor(), getOperation(), getRequestId(),
+ result.getResult());
+ throw new IllegalArgumentException("invalid parameters");
+ }
+
+ // @formatter:off
+ return actorService
+ .getActor(policy.getActor())
+ .getOperator(policy.getRecipe())
+ .startOperation(this);
+ // @formatter:on
+ }
+
+ /**
+ * Gets the name of the actor from the policy.
+ *
+ * @return the actor name, or {@link #UNKNOWN} if no name is available
+ */
+ public String getActor() {
+ return (policy == null || policy.getActor() == null ? UNKNOWN : policy.getActor());
+ }
+
+ /**
+ * Gets the name of the operation from the policy.
+ *
+ * @return the operation name, or {@link #UNKNOWN} if no name is available
+ */
+ public String getOperation() {
+ return (policy == null || policy.getRecipe() == null ? UNKNOWN : policy.getRecipe());
+ }
+
+ /**
+ * Gets the requested ID of the associated event.
+ *
+ * @return the event's request ID, or {@code null} if no request ID is available
+ */
+ public UUID getRequestId() {
+ return (context == null || context.getEvent() == null ? null : context.getEvent().getRequestId());
+ }
+
+ /**
+ * Makes an operation outcome, populating it from the parameters.
+ *
+ * @return a new operation outcome
+ */
+ public ControlLoopOperation makeOutcome() {
+ ControlLoopOperation operation = new ControlLoopOperation();
+ operation.setActor(getActor());
+ operation.setOperation(getOperation());
+ operation.setTarget(target);
+
+ return operation;
+ }
+
+ /**
+ * Invokes the callback to indicate that the operation has started. Any exceptions
+ * generated by the callback are logged, but not re-thrown.
+ *
+ * @param operation the operation that is being started
+ */
+ public void callbackStarted(ControlLoopOperation operation) {
+ logger.info("started operation {}.{} for {}", operation.getActor(), operation.getOperation(), getRequestId());
+
+ if (startCallback != null) {
+ Util.logException(() -> startCallback.accept(operation), "{}.{}: start-callback threw an exception for {}",
+ operation.getActor(), operation.getOperation(), getRequestId());
+ }
+ }
+
+ /**
+ * Invokes the callback to indicate that the operation has completed. Any exceptions
+ * generated by the callback are logged, but not re-thrown.
+ *
+ * @param operation the operation that is being started
+ */
+ public void callbackCompleted(ControlLoopOperation operation) {
+ logger.info("completed operation {}.{} outcome={} for {}", operation.getActor(), operation.getOperation(),
+ operation.getOutcome(), getRequestId());
+
+ if (completeCallback != null) {
+ Util.logException(() -> completeCallback.accept(operation),
+ "{}.{}: complete-callback threw an exception for {}", operation.getActor(),
+ operation.getOperation(), getRequestId());
+ }
+ }
+
+ /**
+ * Validates the parameters.
+ *
+ * @return the validation result
+ */
+ public BeanValidationResult validate() {
+ return new BeanValidator().validateTop(ControlLoopOperationParams.class.getSimpleName(), this);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java
new file mode 100644
index 000000000..da4fb4f0c
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java
@@ -0,0 +1,112 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import java.util.Map;
+import java.util.function.Function;
+import lombok.Data;
+import org.onap.policy.common.parameters.BeanValidationResult;
+import org.onap.policy.common.parameters.BeanValidator;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.common.parameters.annotations.Min;
+import org.onap.policy.common.parameters.annotations.NotBlank;
+import org.onap.policy.common.parameters.annotations.NotNull;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+
+/**
+ * Parameters used by Actors that connect to a server via HTTP. This contains the
+ * parameters that are common to all of the operations. Only the path changes for each
+ * operation, thus it includes a mapping from operation name to path.
+ */
+@Data
+@NotNull
+@NotBlank
+public class HttpActorParams {
+
+ /**
+ * Name of the HttpClient, as found in the HttpClientFactory.
+ */
+ private String clientName;
+
+ /**
+ * Amount of time, in seconds to wait for the HTTP request to complete, where zero
+ * indicates that it should wait forever. The default is zero.
+ */
+ @Min(0)
+ private long timeoutSec = 0;
+
+ /**
+ * Maps the operation name to its URI path.
+ */
+ private Map<String, String> path;
+
+ /**
+ * Extracts a specific operation's parameters from "this".
+ *
+ * @param name name of the item containing "this"
+ * @return a function to extract an operation's parameters from "this". Note: the
+ * returned function is not thread-safe
+ */
+ public Function<String, Map<String, Object>> makeOperationParameters(String name) {
+ HttpParams subparams = HttpParams.builder().clientName(getClientName()).timeoutSec(getTimeoutSec()).build();
+
+ return operation -> {
+ String subpath = path.get(operation);
+ if (subpath == null) {
+ return null;
+ }
+
+ subparams.setPath(subpath);
+ return Util.translateToMap(name + "." + operation, subparams);
+ };
+ }
+
+ /**
+ * Validates the parameters.
+ *
+ * @param name name of the object containing these parameters
+ * @return "this"
+ * @throws IllegalArgumentException if the parameters are invalid
+ */
+ public HttpActorParams doValidation(String name) {
+ ValidationResult result = validate(name);
+ if (!result.isValid()) {
+ throw new ParameterValidationRuntimeException("invalid parameters", result);
+ }
+
+ return this;
+ }
+
+ /**
+ * Validates the parameters.
+ *
+ * @param resultName name of the result
+ *
+ * @return the validation result
+ */
+ public ValidationResult validate(String resultName) {
+ BeanValidationResult result = new BeanValidator().validateTop(resultName, this);
+
+ result.validateMap("path", path, (result2, entry) -> result2.validateNotNull(entry.getKey(), entry.getValue()));
+
+ return result;
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java
new file mode 100644
index 000000000..695ffe4dd
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java
@@ -0,0 +1,69 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import lombok.Builder;
+import lombok.Data;
+import org.onap.policy.common.parameters.BeanValidator;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.common.parameters.annotations.Min;
+import org.onap.policy.common.parameters.annotations.NotBlank;
+import org.onap.policy.common.parameters.annotations.NotNull;
+
+/**
+ * Parameters used by Operators that connect to a server via HTTP.
+ */
+@NotNull
+@NotBlank
+@Data
+@Builder(toBuilder = true)
+public class HttpParams {
+
+ /**
+ * Name of the HttpClient, as found in the HttpClientFactory.
+ */
+ private String clientName;
+
+ /**
+ * URI path.
+ */
+ private String path;
+
+ /**
+ * Amount of time, in seconds to wait for the HTTP request to complete, where zero
+ * indicates that it should wait forever. The default is zero.
+ */
+ @Min(0)
+ @Builder.Default
+ private long timeoutSec = 0;
+
+
+ /**
+ * Validates the parameters.
+ *
+ * @param resultName name of the result
+ *
+ * @return the validation result
+ */
+ public ValidationResult validate(String resultName) {
+ return new BeanValidator().validateTop(resultName, this);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ParameterValidationRuntimeException.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ParameterValidationRuntimeException.java
new file mode 100644
index 000000000..3004e1932
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ParameterValidationRuntimeException.java
@@ -0,0 +1,57 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import lombok.Getter;
+import org.onap.policy.common.parameters.ValidationResult;
+
+/**
+ * Parameter runtime exception, with an associated validation result. This is used to
+ * throw an exception while passing a validation result up the chain.
+ * <p/>
+ * Note: the validation result is <i>not</i> included in the exception message.
+ */
+public class ParameterValidationRuntimeException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ @Getter
+ private final transient ValidationResult result;
+
+
+ public ParameterValidationRuntimeException(ValidationResult result) {
+ this.result = result;
+ }
+
+ public ParameterValidationRuntimeException(String message, ValidationResult result) {
+ super(message);
+ this.result = result;
+ }
+
+ public ParameterValidationRuntimeException(Throwable cause, ValidationResult result) {
+ super(cause);
+ this.result = result;
+ }
+
+ public ParameterValidationRuntimeException(String message, Throwable cause, ValidationResult result) {
+ super(message, cause);
+ this.result = result;
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java
new file mode 100644
index 000000000..9e6d8a15e
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java
@@ -0,0 +1,68 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import lombok.Builder;
+import lombok.Data;
+import org.onap.policy.common.parameters.BeanValidator;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.common.parameters.annotations.Min;
+import org.onap.policy.common.parameters.annotations.NotBlank;
+import org.onap.policy.common.parameters.annotations.NotNull;
+
+/**
+ * Parameters used by Operators that connect to a server via DMaaP.
+ */
+@NotNull
+@NotBlank
+@Data
+@Builder(toBuilder = true)
+public class TopicParams {
+
+ /**
+ * Name of the target topic end point to which requests should be published.
+ */
+ private String target;
+
+ /**
+ * Source topic end point, from which to read responses.
+ */
+ private String source;
+
+ /**
+ * Amount of time, in seconds to wait for the response, where zero indicates that it
+ * should wait forever. The default is zero.
+ */
+ @Min(0)
+ @Builder.Default
+ private long timeoutSec = 0;
+
+ /**
+ * Validates both the publisher and the subscriber parameters.
+ *
+ * @param resultName name of the result
+ *
+ * @return the validation result
+ */
+ public ValidationResult validate(String resultName) {
+ return new BeanValidator().validateTop(resultName, this);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/FutureManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/FutureManager.java
new file mode 100644
index 000000000..aac2f77b7
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/FutureManager.java
@@ -0,0 +1,89 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.pipeline;
+
+import java.util.IdentityHashMap;
+import java.util.concurrent.Future;
+import lombok.NoArgsConstructor;
+
+/**
+ * Manager that manages both futures and listeners. When {@link #stop()} is called, the
+ * listeners are executed and the futures are canceled. The various methods synchronize on
+ * "this" while they manipulate internal data structures.
+ */
+@NoArgsConstructor
+public class FutureManager extends ListenerManager {
+
+ /**
+ * Maps a future to its listener. Records the {@link Runnable} that is passed to
+ * {@link ListenerManager#add(Runnable)} when {@link #add(Future)} is invoked. This is
+ * needed if {@link #remove(Future)} is invoked, so that the same {@link Runnable} is
+ * used each time.
+ */
+ @SuppressWarnings("rawtypes")
+ private final IdentityHashMap<Future, Runnable> future2listener = new IdentityHashMap<>(5);
+
+ /**
+ * Adds a future that is to be canceled when this controller is stopped. Note: if the
+ * controller is already stopped, then the future will be canceled immediately, within
+ * the invoking thread.
+ *
+ * @param future future to be added
+ */
+ public <T> void add(Future<T> future) {
+ Runnable listener = () -> future.cancel(false);
+
+ synchronized (this) {
+ if (future2listener.putIfAbsent(future, listener) != null) {
+ // this future is already in the map, nothing more to do
+ return;
+ }
+
+ if (addOnly(listener)) {
+ // successfully added
+ return;
+ }
+ }
+
+ runListener(listener);
+ }
+
+ /**
+ * Removes a future so that it is not canceled when this controller is stopped.
+ *
+ * @param future future to be removed
+ */
+ public synchronized <T> void remove(Future<T> future) {
+ Runnable listener = future2listener.remove(future);
+ if (listener != null) {
+ remove(listener);
+ }
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+
+ synchronized (this) {
+ future2listener.clear();
+ }
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java
new file mode 100644
index 000000000..d34a3fb5b
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java
@@ -0,0 +1,115 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.pipeline;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+
+/**
+ * Listener manager, used by operations within the pipeline to determine if they should
+ * continue to run. When {@link #stop()} is called, the listeners are executed. The
+ * various methods synchronize on "this" while they manipulate internal data structures.
+ */
+@NoArgsConstructor
+public class ListenerManager {
+
+ @Getter
+ private volatile boolean running = true;
+
+ /**
+ * Listeners to be executed when {@link #stop()} is invoked.
+ */
+ private final IdentityHashMap<Runnable, Void> listeners = new IdentityHashMap<>(5);
+
+ /**
+ * Indicates that operations within the pipeline should stop executing.
+ */
+ public void stop() {
+ ArrayList<Runnable> items;
+
+ synchronized (this) {
+ if (!running) {
+ return;
+ }
+
+ running = false;
+ items = new ArrayList<>(listeners.keySet());
+ listeners.clear();
+ }
+
+ items.forEach(this::runListener);
+ }
+
+ /**
+ * Adds a listener that is to be invoked when this controller is stopped. Note: if the
+ * controller is already stopped, then the listener will be invoked immediately,
+ * within the invoking thread.
+ *
+ * @param listener listener to be added
+ */
+ public void add(Runnable listener) {
+ if (!addOnly(listener)) {
+ runListener(listener);
+ }
+ }
+
+ /**
+ * Adds a listener that is to be invoked when this controller is stopped. Note: if the
+ * controller is already stopped, then the listener will be invoked immediately,
+ * within the invoking thread.
+ *
+ * @param listener listener to be added
+ * @return {@code true} if the the listener was added, {@code false} if it could not
+ * be added because this manager has already been stopped
+ */
+ protected boolean addOnly(Runnable listener) {
+ synchronized (this) {
+ if (running) {
+ listeners.put(listener, null);
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Runs a listener, catching any exceptions that it may throw.
+ *
+ * @param listener listener to be executed
+ */
+ protected void runListener(Runnable listener) {
+ // TODO do this asynchronously?
+ Util.logException(listener, "pipeline listener {} threw an exception", listener);
+ }
+
+ /**
+ * Removes a listener so that it is not invoked when this controller is stopped.
+ *
+ * @param listener listener to be removed
+ */
+ public synchronized void remove(Runnable listener) {
+ listeners.remove(listener);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java
new file mode 100644
index 000000000..96c8f9e05
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java
@@ -0,0 +1,157 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.pipeline;
+
+import static org.onap.policy.controlloop.actorserviceprovider.Util.ident;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import lombok.NoArgsConstructor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Pipeline controller, used by operations within the pipeline to determine if they should
+ * continue to run. If {@link #cancel(boolean)} is invoked, it automatically stops the
+ * pipeline.
+ */
+@NoArgsConstructor
+public class PipelineControllerFuture<T> extends CompletableFuture<T> {
+
+ private static final Logger logger = LoggerFactory.getLogger(PipelineControllerFuture.class);
+
+ /**
+ * Tracks items added to this controller via one of the <i>add</i> methods.
+ */
+ private final FutureManager futures = new FutureManager();
+
+
+ /**
+ * Cancels and stops the pipeline, in that order.
+ */
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ try {
+ logger.trace("{}: cancel future", ident(this));
+ return super.cancel(mayInterruptIfRunning);
+
+ } finally {
+ futures.stop();
+ }
+ }
+
+ /**
+ * Generates a function that, when invoked, will remove the given future. This is
+ * typically added onto the end of a pipeline via one of the
+ * {@link CompletableFuture#whenComplete(BiConsumer)} methods.
+ *
+ * @return a function that removes the given future
+ */
+ public <F> BiConsumer<T, Throwable> delayedRemove(Future<F> future) {
+ return (value, thrown) -> {
+ logger.trace("{}: remove future {}", ident(this), ident(future));
+ remove(future);
+ };
+ }
+
+ /**
+ * Generates a function that, when invoked, will remove the given listener. This is
+ * typically added onto the end of a pipeline via one of the
+ * {@link CompletableFuture#whenComplete(BiConsumer)} methods.
+ *
+ * @return a function that removes the given listener
+ */
+ public BiConsumer<T, Throwable> delayedRemove(Runnable listener) {
+ return (value, thrown) -> {
+ logger.trace("{}: remove listener {}", ident(this), ident(listener));
+ remove(listener);
+ };
+ }
+
+ /**
+ * Generates a function that, when invoked, will stop all pipeline listeners and
+ * complete this future. This is typically added onto the end of a pipeline via one of
+ * the {@link CompletableFuture#whenComplete(BiConsumer)} methods.
+ *
+ * @return a function that stops all pipeline listeners
+ */
+ public BiConsumer<T, Throwable> delayedComplete() {
+ return (value, thrown) -> {
+ if (thrown == null) {
+ logger.trace("{}: complete and stop future", ident(this));
+ complete(value);
+ } else {
+ logger.trace("{}: complete exceptionally and stop future", ident(this));
+ completeExceptionally(thrown);
+ }
+
+ futures.stop();
+ };
+ }
+
+ /**
+ * Adds a function whose return value is to be canceled when this controller is
+ * stopped. Note: if the controller is already stopped, then the function will
+ * <i>not</i> be executed.
+ *
+ * @param futureMaker function to be invoked in the future
+ */
+ public <F> Function<F, CompletableFuture<F>> add(Function<F, CompletableFuture<F>> futureMaker) {
+
+ return input -> {
+ if (!isRunning()) {
+ logger.trace("{}: discarded new future", ident(this));
+ return new CompletableFuture<>();
+ }
+
+ CompletableFuture<F> future = futureMaker.apply(input);
+ add(future);
+
+ return future;
+ };
+ }
+
+ public <F> void add(Future<F> future) {
+ logger.trace("{}: add future {}", ident(this), ident(future));
+ futures.add(future);
+ }
+
+ public void add(Runnable listener) {
+ logger.trace("{}: add listener {}", ident(this), ident(listener));
+ futures.add(listener);
+ }
+
+ public boolean isRunning() {
+ return futures.isRunning();
+ }
+
+ public <F> void remove(Future<F> future) {
+ logger.trace("{}: remove future {}", ident(this), ident(future));
+ futures.remove(future);
+ }
+
+ public void remove(Runnable listener) {
+ logger.trace("{}: remove listener {}", ident(this), ident(listener));
+ futures.remove(listener);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java
index 88f3c16eb..620950a3c 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* Actor
* ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018, 2020 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2019 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,9 +21,56 @@
package org.onap.policy.controlloop.actorserviceprovider.spi;
+import java.util.Collection;
+
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.onap.policy.common.capabilities.Configurable;
+import org.onap.policy.common.capabilities.Startable;
+import org.onap.policy.controlloop.actorserviceprovider.Operator;
+
+/**
+ * This is the service interface for defining an Actor used in Control Loop Operational
+ * Policies for performing actions on runtime entities.
+ *
+ * @author pameladragosh
+ *
+ */
+public interface Actor extends Startable, Configurable<Map<String,Object>> {
+
+ /**
+ * Gets the name of the actor.
+ *
+ * @return the actor name
+ */
+ String getName();
+
+ /**
+ * Gets a particular operator.
+ *
+ * @param name name of the operation of interest
+ * @return the desired operation
+ * @throws IllegalArgumentException if no operation by the given name exists
+ */
+ Operator getOperator(String name);
+
+ /**
+ * Gets the supported operations.
+ *
+ * @return the supported operations
+ */
+ public Collection<Operator> getOperators();
+
+ /**
+ * Gets the names of the supported operations.
+ *
+ * @return the names of the supported operations
+ */
+ public Set<String> getOperationNames();
+
-public interface Actor {
+ // TODO old code: remove lines down to **HERE**
String actor();
@@ -33,4 +80,5 @@ public interface Actor {
List<String> recipePayloads(String recipe);
+ // **HERE**
}