summaryrefslogtreecommitdiffstats
path: root/models-interactions/model-actors/actorServiceProvider/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'models-interactions/model-actors/actorServiceProvider/src/main')
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java167
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/DelayedIdentString.java65
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java58
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java114
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java90
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java239
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java757
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/StartConfigPartial.java153
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java210
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java112
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java69
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ParameterValidationRuntimeException.java57
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java68
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/FutureManager.java89
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java115
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java157
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java52
17 files changed, 2546 insertions, 26 deletions
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java
index 809936146..13f09b1ad 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ActorService
* ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018, 2020 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2019 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,25 +21,56 @@
package org.onap.policy.controlloop.actorserviceprovider;
-import com.google.common.collect.ImmutableList;
-
-import java.util.Iterator;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import java.util.ServiceLoader;
-
+import java.util.Set;
+import org.onap.policy.common.parameters.BeanValidationResult;
+import org.onap.policy.controlloop.actorserviceprovider.impl.StartConfigPartial;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ActorService {
-
+/**
+ * Service that manages a set of actors. To use the service, first invoke
+ * {@link #configure(Map)} to configure all of the actors, and then invoke
+ * {@link #start()} to start all of the actors. When finished using the actor service,
+ * invoke {@link #stop()} or {@link #shutdown()}.
+ */
+public class ActorService extends StartConfigPartial<Map<String, Object>> {
private static final Logger logger = LoggerFactory.getLogger(ActorService.class);
- private static ActorService service;
- // USed to load actors
- private final ServiceLoader<Actor> loader;
+ private final Map<String, Actor> name2actor;
+
+ private static class LazyHolder {
+ static final ActorService INSTANCE = new ActorService();
+ }
+
+ /**
+ * Constructs the object and loads the list of actors.
+ */
+ protected ActorService() {
+ super("actors");
+
+ Map<String, Actor> map = new HashMap<>();
+
+ for (Actor newActor : loadActors()) {
+ map.compute(newActor.getName(), (name, existingActor) -> {
+ if (existingActor == null) {
+ return newActor;
+ }
+
+ // TODO: should this throw an exception?
+ logger.warn("duplicate actor names for {}: {}, ignoring {}", name,
+ existingActor.getClass().getSimpleName(), newActor.getClass().getSimpleName());
+ return existingActor;
+ });
+ }
- private ActorService() {
- loader = ServiceLoader.load(Actor.class);
+ name2actor = ImmutableMap.copyOf(map);
}
/**
@@ -47,27 +78,115 @@ public class ActorService {
*
* @return the instance
*/
- public static synchronized ActorService getInstance() {
- if (service == null) {
- service = new ActorService();
+ public static ActorService getInstance() {
+ return LazyHolder.INSTANCE;
+ }
+
+ /**
+ * Gets a particular actor.
+ *
+ * @param name name of the actor of interest
+ * @return the desired actor
+ * @throws IllegalArgumentException if no actor by the given name exists
+ */
+ public Actor getActor(String name) {
+ Actor actor = name2actor.get(name);
+ if (actor == null) {
+ throw new IllegalArgumentException("unknown actor " + name);
}
- return service;
+
+ return actor;
}
/**
- * Get the actors.
+ * Gets the actors.
*
* @return the actors
*/
- public ImmutableList<Actor> actors() {
- Iterator<Actor> iter = loader.iterator();
- logger.debug("returning actors");
- while (iter.hasNext()) {
- if (logger.isDebugEnabled()) {
- logger.debug("Got {}", iter.next().actor());
+ public Collection<Actor> getActors() {
+ return name2actor.values();
+ }
+
+ /**
+ * Gets the names of the actors.
+ *
+ * @return the actor names
+ */
+ public Set<String> getActorNames() {
+ return name2actor.keySet();
+ }
+
+ @Override
+ protected void doConfigure(Map<String, Object> parameters) {
+ logger.info("configuring actors");
+
+ BeanValidationResult valres = new BeanValidationResult("ActorService", parameters);
+
+ for (Actor actor : name2actor.values()) {
+ String actorName = actor.getName();
+ Map<String, Object> subparams = Util.translateToMap(actorName, parameters.get(actorName));
+
+ if (subparams != null) {
+
+ try {
+ actor.configure(subparams);
+
+ } catch (ParameterValidationRuntimeException e) {
+ logger.warn("failed to configure actor {}", actorName, e);
+ valres.addResult(e.getResult());
+
+ } catch (RuntimeException e) {
+ logger.warn("failed to configure actor {}", actorName, e);
+ }
+
+ } else if (actor.isConfigured()) {
+ logger.warn("missing configuration parameters for actor {}; using previous parameters", actorName);
+
+ } else {
+ logger.warn("missing configuration parameters for actor {}; actor cannot be started", actorName);
}
}
- return ImmutableList.copyOf(loader.iterator());
+ if (!valres.isValid() && logger.isWarnEnabled()) {
+ logger.warn("actor services validation errors:\n{}", valres.getResult());
+ }
+ }
+
+ @Override
+ protected void doStart() {
+ logger.info("starting actors");
+
+ for (Actor actor : name2actor.values()) {
+ if (actor.isConfigured()) {
+ Util.logException(actor::start, "failed to start actor {}", actor.getName());
+
+ } else {
+ logger.warn("not starting unconfigured actor {}", actor.getName());
+ }
+ }
+ }
+
+ @Override
+ protected void doStop() {
+ logger.info("stopping actors");
+ name2actor.values()
+ .forEach(actor -> Util.logException(actor::stop, "failed to stop actor {}", actor.getName()));
+ }
+
+ @Override
+ protected void doShutdown() {
+ logger.info("shutting down actors");
+
+ // @formatter:off
+ name2actor.values().forEach(
+ actor -> Util.logException(actor::shutdown, "failed to shutdown actor {}", actor.getName()));
+
+ // @formatter:on
+ }
+
+ // the following methods may be overridden by junit tests
+
+ protected Iterable<Actor> loadActors() {
+ return ServiceLoader.load(Actor.class);
}
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/DelayedIdentString.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/DelayedIdentString.java
new file mode 100644
index 000000000..b7a9a53ad
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/DelayedIdentString.java
@@ -0,0 +1,65 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider;
+
+import lombok.AllArgsConstructor;
+
+/**
+ * Object whose {@link #toString()} method invokes {@link Object#toString()} on another
+ * object, on-demand. This assumes that the other object's method returns an object
+ * identifier. This is typically used to include an object's identifier in a log message.
+ */
+@AllArgsConstructor
+public class DelayedIdentString {
+ /**
+ * String to return for null objects or null object identifiers.
+ */
+ public static final String NULL_STRING = "null";
+
+ private final Object object;
+
+ /**
+ * Gets the object's identifier, after stripping anything appearing before '@'.
+ */
+ @Override
+ public String toString() {
+ if (object == null) {
+ return NULL_STRING;
+ }
+
+ String ident = objectToString();
+ if (ident == null) {
+ return NULL_STRING;
+ }
+
+ int index = ident.indexOf('@');
+ return (index > 0 ? ident.substring(index) : ident);
+ }
+
+ /**
+ * Invokes the object's {@link Object#toString()} method.
+ *
+ * @return the output from the object's {@link Object#toString()} method
+ */
+ protected String objectToString() {
+ return object.toString();
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java
new file mode 100644
index 000000000..e308ee42e
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java
@@ -0,0 +1,58 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.onap.policy.common.capabilities.Configurable;
+import org.onap.policy.common.capabilities.Startable;
+import org.onap.policy.controlloop.ControlLoopOperation;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+
+/**
+ * This is the service interface for defining an Actor operation used in Control Loop
+ * Operational Policies for performing actions on runtime entities.
+ */
+public interface Operator extends Startable, Configurable<Map<String, Object>> {
+
+ /**
+ * Gets the name of the associated actor.
+ *
+ * @return the name of the associated actor
+ */
+ String getActorName();
+
+ /**
+ * Gets the name of the operation.
+ *
+ * @return the operation name
+ */
+ String getName();
+
+ /**
+ * Called by enforcement PDP engine to start the operation. As part of the operation,
+ * it invokes the "start" and "complete" call-backs found within the parameters.
+ *
+ * @param params parameters needed to start the operation
+ * @return a future that can be used to cancel or await the result of the operation
+ */
+ CompletableFuture<ControlLoopOperation> startOperation(ControlLoopOperationParams params);
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java
new file mode 100644
index 000000000..0aba1a7fa
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java
@@ -0,0 +1,114 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Actor utilities.
+ */
+public class Util {
+ private static final Logger logger = LoggerFactory.getLogger(Util.class);
+
+ private Util() {
+ // do nothing
+ }
+
+ /**
+ * Extracts an object's identity by invoking {@link Object#toString()} and returning
+ * the portion starting with "@". Extraction is done on-demand, when toString() is
+ * called on the result. This is typically used when logging.
+ *
+ * @param object object whose identity is to be extracted
+ * @return an object that will extract the source object's identity when this object's
+ * toString() method is called
+ */
+ public static Object ident(Object object) {
+ return new DelayedIdentString(object);
+ }
+
+ /**
+ * Runs a function and logs a message if it throws an exception. Does <i>not</i>
+ * re-throw the exception.
+ *
+ * @param function function to be run
+ * @param exceptionMessage message to log if an exception is thrown
+ * @param exceptionArgs arguments to be passed to the logger
+ */
+ public static void logException(Runnable function, String exceptionMessage, Object... exceptionArgs) {
+ try {
+ function.run();
+
+ } catch (RuntimeException ex) {
+ // create a new array containing the original arguments plus the exception
+ Object[] allArgs = Arrays.copyOf(exceptionArgs, exceptionArgs.length + 1);
+ allArgs[exceptionArgs.length] = ex;
+
+ logger.warn(exceptionMessage, allArgs);
+ }
+ }
+
+ /**
+ * Translates parameters from one class to another, typically from a Map to a POJO or
+ * vice versa.
+ *
+ * @param identifier identifier of the actor/operation being translated; used to build
+ * an exception message
+ * @param source source object to be translated
+ * @param clazz target class
+ * @return the translated object
+ */
+ public static <T> T translate(String identifier, Object source, Class<T> clazz) {
+ Coder coder = new StandardCoder();
+
+ try {
+ String json = coder.encode(source);
+ return coder.decode(json, clazz);
+
+ } catch (CoderException | RuntimeException e) {
+ throw new IllegalArgumentException("cannot translate parameters for " + identifier, e);
+ }
+ }
+
+ /**
+ * Translates parameters to a Map.
+ *
+ * @param identifier identifier of the actor/operation being translated; used to build
+ * an exception message
+ * @param source source parameters
+ * @return the parameters, as a Map
+ */
+ @SuppressWarnings("unchecked")
+ public static Map<String, Object> translateToMap(String identifier, Object source) {
+ if (source == null) {
+ return null;
+ }
+
+ return translate(identifier, source, LinkedHashMap.class);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java
new file mode 100644
index 000000000..68bbe7edc
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java
@@ -0,0 +1,90 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.controlloop;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.aai.AaiCqResponse;
+import org.onap.policy.controlloop.VirtualControlLoopEvent;
+
+/**
+ * Context associated with a control loop event.
+ */
+@Getter
+@Setter
+public class ControlLoopEventContext implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ @Setter(AccessLevel.NONE)
+ private final VirtualControlLoopEvent event;
+
+ private AaiCqResponse aaiCqResponse;
+
+ // TODO may remove this if it proves not to be needed
+ @Getter(AccessLevel.NONE)
+ @Setter(AccessLevel.NONE)
+ private Map<String, Serializable> properties = new ConcurrentHashMap<>();
+
+ /**
+ * Constructs the object.
+ *
+ * @param event event with which this is associated
+ */
+ public ControlLoopEventContext(VirtualControlLoopEvent event) {
+ this.event = event;
+ }
+
+ /**
+ * Determines if the context contains a property.
+ *
+ * @param name name of the property of interest
+ * @return {@code true} if the context contains the property, {@code false} otherwise
+ */
+ public boolean contains(String name) {
+ return properties.containsKey(name);
+ }
+
+ /**
+ * Gets a property, casting it to the desired type.
+ *
+ * @param <T> desired type
+ * @param name name of the property whose value is to be retrieved
+ * @return the property's value, or {@code null} if it does not yet have a value
+ */
+ @SuppressWarnings("unchecked")
+ public <T> T getProperty(String name) {
+ return (T) properties.get(name);
+ }
+
+ /**
+ * Sets a property's value.
+ *
+ * @param name property name
+ * @param value new property value
+ */
+ public void setProperty(String name, Serializable value) {
+ properties.put(name, value);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java
new file mode 100644
index 000000000..9b9aa914e
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java
@@ -0,0 +1,239 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.onap.policy.common.parameters.BeanValidationResult;
+import org.onap.policy.controlloop.actorserviceprovider.Operator;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
+import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of an actor.
+ */
+public class ActorImpl extends StartConfigPartial<Map<String, Object>> implements Actor {
+ private static final Logger logger = LoggerFactory.getLogger(ActorImpl.class);
+
+ /**
+ * Maps a name to an operator.
+ */
+ private Map<String, Operator> name2operator;
+
+ /**
+ * Constructs the object.
+ *
+ * @param name actor name
+ * @param operators the operations supported by this actor
+ */
+ public ActorImpl(String name, Operator... operators) {
+ super(name);
+ setOperators(Arrays.asList(operators));
+ }
+
+ /**
+ * Sets the operators supported by this actor, overriding any previous list.
+ *
+ * @param operators the operations supported by this actor
+ */
+ protected void setOperators(List<Operator> operators) {
+ if (isConfigured()) {
+ throw new IllegalStateException("attempt to set operators on a configured actor: " + getName());
+ }
+
+ Map<String, Operator> map = new HashMap<>();
+ for (Operator newOp : operators) {
+ map.compute(newOp.getName(), (opName, existingOp) -> {
+ if (existingOp == null) {
+ return newOp;
+ }
+
+ // TODO: should this throw an exception?
+ logger.warn("duplicate names for actor operation {}.{}: {}, ignoring {}", getName(), opName,
+ existingOp.getClass().getSimpleName(), newOp.getClass().getSimpleName());
+ return existingOp;
+ });
+ }
+
+ this.name2operator = ImmutableMap.copyOf(map);
+ }
+
+ @Override
+ public String getName() {
+ return getFullName();
+ }
+
+ @Override
+ public Operator getOperator(String name) {
+ Operator operator = name2operator.get(name);
+ if (operator == null) {
+ throw new IllegalArgumentException("unknown operation " + getName() + "." + name);
+ }
+
+ return operator;
+ }
+
+ @Override
+ public Collection<Operator> getOperators() {
+ return name2operator.values();
+ }
+
+ @Override
+ public Set<String> getOperationNames() {
+ return name2operator.keySet();
+ }
+
+ /**
+ * For each operation, it looks for a set of parameters by the same name and, if
+ * found, configures the operation with the parameters.
+ */
+ @Override
+ protected void doConfigure(Map<String, Object> parameters) {
+ final String actorName = getName();
+ logger.info("configuring operations for actor {}", actorName);
+
+ BeanValidationResult valres = new BeanValidationResult(actorName, parameters);
+
+ // function that creates operator-specific parameters, given the operation name
+ Function<String, Map<String, Object>> opParamsMaker = makeOperatorParameters(parameters);
+
+ for (Operator operator : name2operator.values()) {
+ String operName = operator.getName();
+ Map<String, Object> subparams = opParamsMaker.apply(operName);
+
+ if (subparams != null) {
+
+ try {
+ operator.configure(subparams);
+
+ } catch (ParameterValidationRuntimeException e) {
+ logger.warn("failed to configure operation {}.{}", actorName, operName, e);
+ valres.addResult(e.getResult());
+
+ } catch (RuntimeException e) {
+ logger.warn("failed to configure operation {}.{}", actorName, operName, e);
+ }
+
+ } else if (operator.isConfigured()) {
+ logger.warn("missing configuration parameters for operation {}.{}; using previous parameters",
+ actorName, operName);
+
+ } else {
+ logger.warn("missing configuration parameters for operation {}.{}; operation cannot be started",
+ actorName, operName);
+ }
+ }
+ }
+
+ /**
+ * Extracts the operator parameters from the actor parameters, for a given operator.
+ * This method assumes each operation has its own set of parameters.
+ *
+ * @param actorParameters actor parameters
+ * @return a function to extract the operator parameters from the actor parameters.
+ * Note: this function may return {@code null} if there are no parameters for
+ * the given operation name
+ */
+ protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) {
+
+ return operName -> Util.translateToMap(getName() + "." + operName, actorParameters.get(operName));
+ }
+
+ /**
+ * Starts each operation.
+ */
+ @Override
+ protected void doStart() {
+ final String actorName = getName();
+ logger.info("starting operations for actor {}", actorName);
+
+ for (Operator oper : name2operator.values()) {
+ if (oper.isConfigured()) {
+ Util.logException(oper::start, "failed to start operation {}.{}", actorName, oper.getName());
+
+ } else {
+ logger.warn("not starting unconfigured operation {}.{}", actorName, oper.getName());
+ }
+ }
+ }
+
+ /**
+ * Stops each operation.
+ */
+ @Override
+ protected void doStop() {
+ final String actorName = getName();
+ logger.info("stopping operations for actor {}", actorName);
+
+ // @formatter:off
+ name2operator.values().forEach(
+ oper -> Util.logException(oper::stop, "failed to stop operation {}.{}", actorName, oper.getName()));
+ // @formatter:on
+ }
+
+ /**
+ * Shuts down each operation.
+ */
+ @Override
+ protected void doShutdown() {
+ final String actorName = getName();
+ logger.info("shutting down operations for actor {}", actorName);
+
+ // @formatter:off
+ name2operator.values().forEach(oper -> Util.logException(oper::shutdown,
+ "failed to shutdown operation {}.{}", actorName, oper.getName()));
+ // @formatter:on
+ }
+
+ // TODO old code: remove lines down to **HERE**
+
+ @Override
+ public String actor() {
+ return null;
+ }
+
+ @Override
+ public List<String> recipes() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<String> recipeTargets(String recipe) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<String> recipePayloads(String recipe) {
+ return Collections.emptyList();
+ }
+
+ // **HERE**
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java
new file mode 100644
index 000000000..80d8fbd04
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java
@@ -0,0 +1,757 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import lombok.Getter;
+import org.onap.policy.controlloop.ControlLoopOperation;
+import org.onap.policy.controlloop.actorserviceprovider.Operator;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
+import org.onap.policy.controlloop.policy.Policy;
+import org.onap.policy.controlloop.policy.PolicyResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Partial implementation of an operator. Subclasses can choose to simply implement
+ * {@link #doOperation(ControlLoopOperationParams)}, or they may choose to override
+ * {@link #doOperationAsFuture(ControlLoopOperationParams)}.
+ */
+public abstract class OperatorPartial extends StartConfigPartial<Map<String, Object>> implements Operator {
+
+ private static final Logger logger = LoggerFactory.getLogger(OperatorPartial.class);
+
+ private static final String OUTCOME_SUCCESS = PolicyResult.SUCCESS.toString();
+ private static final String OUTCOME_FAILURE = PolicyResult.FAILURE.toString();
+ private static final String OUTCOME_RETRIES = PolicyResult.FAILURE_RETRIES.toString();
+
+ @Getter
+ private final String actorName;
+
+ @Getter
+ private final String name;
+
+ /**
+ * Constructs the object.
+ *
+ * @param actorName name of the actor with which this operator is associated
+ * @param name operation name
+ */
+ public OperatorPartial(String actorName, String name) {
+ super(actorName + "." + name);
+ this.actorName = actorName;
+ this.name = name;
+ }
+
+ /**
+ * This method does nothing.
+ */
+ @Override
+ protected void doConfigure(Map<String, Object> parameters) {
+ // do nothing
+ }
+
+ /**
+ * This method does nothing.
+ */
+ @Override
+ protected void doStart() {
+ // do nothing
+ }
+
+ /**
+ * This method does nothing.
+ */
+ @Override
+ protected void doStop() {
+ // do nothing
+ }
+
+ /**
+ * This method does nothing.
+ */
+ @Override
+ protected void doShutdown() {
+ // do nothing
+ }
+
+ @Override
+ public final CompletableFuture<ControlLoopOperation> startOperation(ControlLoopOperationParams params) {
+ if (!isAlive()) {
+ throw new IllegalStateException("operation is not running: " + getFullName());
+ }
+
+ final Executor executor = params.getExecutor();
+
+ // allocate a controller for the entire operation
+ final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
+
+ CompletableFuture<ControlLoopOperation> preproc = startPreprocessor(params);
+ if (preproc == null) {
+ // no preprocessor required - just start the operation
+ return startOperationAttempt(params, controller, 1);
+ }
+
+ // propagate "stop" to the preprocessor
+ controller.add(preproc);
+
+ /*
+ * Do preprocessor first and then, if successful, start the operation. Note:
+ * operations create their own outcome, ignoring the outcome from any previous
+ * steps.
+ */
+ preproc.whenCompleteAsync(controller.delayedRemove(preproc), executor)
+ .thenComposeAsync(handleFailure(params, controller), executor)
+ .thenComposeAsync(onSuccess(params, unused -> startOperationAttempt(params, controller, 1)),
+ executor);
+
+ return controller;
+ }
+
+ /**
+ * Starts an operation's preprocessor step(s). If the preprocessor fails, then it
+ * invokes the started and completed call-backs.
+ *
+ * @param params operation parameters
+ * @return a future that will return the preprocessor outcome, or {@code null} if this
+ * operation needs no preprocessor
+ */
+ protected CompletableFuture<ControlLoopOperation> startPreprocessor(ControlLoopOperationParams params) {
+ logger.info("{}: start low-level operation preprocessor for {}", getFullName(), params.getRequestId());
+
+ final Executor executor = params.getExecutor();
+ final ControlLoopOperation operation = params.makeOutcome();
+
+ final Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preproc =
+ doPreprocessorAsFuture(params);
+ if (preproc == null) {
+ // no preprocessor required
+ return null;
+ }
+
+ // allocate a controller for the preprocessor steps
+ final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
+
+ /*
+ * Don't mark it complete until we've built the whole pipeline. This will prevent
+ * the operation from starting until after it has been successfully built (i.e.,
+ * without generating any exceptions).
+ */
+ final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>();
+
+ // @formatter:off
+ firstFuture
+ .thenComposeAsync(controller.add(preproc), executor)
+ .exceptionally(fromException(params, operation))
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+ // @formatter:on
+
+ // start the pipeline
+ firstFuture.complete(operation);
+
+ return controller;
+ }
+
+ /**
+ * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
+ * invokes the call-backs and returns a failed outcome. Otherwise, it returns the
+ * outcome that it received.
+ *
+ * @param params operation parameters
+ * @param controller pipeline controller
+ * @return a function that checks the outcome status and continues, if successful, or
+ * indicates a failure otherwise
+ */
+ private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> handleFailure(
+ ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller) {
+
+ return outcome -> {
+
+ if (outcome != null && isSuccess(outcome)) {
+ logger.trace("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
+ return CompletableFuture.completedFuture(outcome);
+ }
+
+ logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
+
+ final Executor executor = params.getExecutor();
+ final CallbackManager callbacks = new CallbackManager();
+
+ // propagate "stop" to the callbacks
+ controller.add(callbacks);
+
+ final ControlLoopOperation outcome2 = params.makeOutcome();
+
+ // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
+
+ outcome2.setOutcome(PolicyResult.FAILURE_GUARD.toString());
+ outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
+
+ CompletableFuture.completedFuture(outcome2).thenApplyAsync(callbackStarted(params, callbacks), executor)
+ .thenApplyAsync(callbackCompleted(params, callbacks), executor)
+ .whenCompleteAsync(controller.delayedRemove(callbacks), executor)
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+
+ return controller;
+ };
+ }
+
+ /**
+ * Invokes the operation's preprocessor step(s) as a "future". This method simply
+ * returns {@code null}.
+ * <p/>
+ * This method assumes the following:
+ * <ul>
+ * <li>the operator is alive</li>
+ * <li>exceptions generated within the pipeline will be handled by the invoker</li>
+ * </ul>
+ *
+ * @param params operation parameters
+ * @return a function that will start the preprocessor and returns its outcome, or
+ * {@code null} if this operation needs no preprocessor
+ */
+ protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture(
+ ControlLoopOperationParams params) {
+ return null;
+ }
+
+ /**
+ * Starts the operation attempt, with no preprocessor. When all retries complete, it
+ * will complete the controller.
+ *
+ * @param params operation parameters
+ * @param controller controller for all operation attempts
+ * @param attempt attempt number, typically starting with 1
+ * @return a future that will return the final result of all attempts
+ */
+ private CompletableFuture<ControlLoopOperation> startOperationAttempt(ControlLoopOperationParams params,
+ PipelineControllerFuture<ControlLoopOperation> controller, int attempt) {
+
+ final Executor executor = params.getExecutor();
+
+ CompletableFuture<ControlLoopOperation> future = startAttemptWithoutRetries(params, attempt);
+
+ // propagate "stop" to the operation attempt
+ controller.add(future);
+
+ // detach when complete
+ future.whenCompleteAsync(controller.delayedRemove(future), executor)
+ .thenComposeAsync(retryOnFailure(params, controller, attempt), params.getExecutor())
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+
+ return controller;
+ }
+
+ /**
+ * Starts the operation attempt, without doing any retries.
+ *
+ * @param params operation parameters
+ * @param attempt attempt number, typically starting with 1
+ * @return a future that will return the result of a single operation attempt
+ */
+ private CompletableFuture<ControlLoopOperation> startAttemptWithoutRetries(ControlLoopOperationParams params,
+ int attempt) {
+
+ logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
+
+ final Executor executor = params.getExecutor();
+ final ControlLoopOperation outcome = params.makeOutcome();
+ final CallbackManager callbacks = new CallbackManager();
+
+ // this operation attempt gets its own controller
+ final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
+
+ // propagate "stop" to the callbacks
+ controller.add(callbacks);
+
+ /*
+ * Don't mark it complete until we've built the whole pipeline. This will prevent
+ * the operation from starting until after it has been successfully built (i.e.,
+ * without generating any exceptions).
+ */
+ final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>();
+
+ // @formatter:off
+ CompletableFuture<ControlLoopOperation> future2 =
+ firstFuture.thenComposeAsync(verifyRunning(controller, params), executor)
+ .thenApplyAsync(callbackStarted(params, callbacks), executor)
+ .thenComposeAsync(controller.add(doOperationAsFuture(params, attempt)), executor);
+ // @formatter:on
+
+ // handle timeouts, if specified
+ long timeoutMillis = getTimeOutMillis(params.getPolicy());
+ if (timeoutMillis > 0) {
+ logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
+ future2 = future2.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
+ }
+
+ /*
+ * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
+ * before callbackCompleted() is invoked.
+ *
+ * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
+ * the pipeline as the last step anyway.
+ */
+
+ // @formatter:off
+ future2.exceptionally(fromException(params, outcome))
+ .thenApplyAsync(setRetryFlag(params, attempt), executor)
+ .thenApplyAsync(callbackStarted(params, callbacks), executor)
+ .thenApplyAsync(callbackCompleted(params, callbacks), executor)
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+ // @formatter:on
+
+ // start the pipeline
+ firstFuture.complete(outcome);
+
+ return controller;
+ }
+
+ /**
+ * Determines if the outcome was successful.
+ *
+ * @param outcome outcome to examine
+ * @return {@code true} if the outcome was successful
+ */
+ protected boolean isSuccess(ControlLoopOperation outcome) {
+ return OUTCOME_SUCCESS.equals(outcome.getOutcome());
+ }
+
+ /**
+ * Determines if the outcome was a failure for this operator.
+ *
+ * @param outcome outcome to examine, or {@code null}
+ * @return {@code true} if the outcome is not {@code null} and was a failure
+ * <i>and</i> was associated with this operator, {@code false} otherwise
+ */
+ protected boolean isActorFailed(ControlLoopOperation outcome) {
+ return OUTCOME_FAILURE.equals(getActorOutcome(outcome));
+ }
+
+ /**
+ * Invokes the operation as a "future". This method simply invokes
+ * {@link #doOperation(ControlLoopOperationParams)} turning it into a "future".
+ * <p/>
+ * This method assumes the following:
+ * <ul>
+ * <li>the operator is alive</li>
+ * <li>verifyRunning() has been invoked</li>
+ * <li>callbackStarted() has been invoked</li>
+ * <li>the invoker will perform appropriate timeout checks</li>
+ * <li>exceptions generated within the pipeline will be handled by the invoker</li>
+ * </ul>
+ *
+ * @param params operation parameters
+ * @param attempt attempt number, typically starting with 1
+ * @return a function that will start the operation and return its result when
+ * complete
+ */
+ protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture(
+ ControlLoopOperationParams params, int attempt) {
+
+ /*
+ * TODO As doOperation() may perform blocking I/O, this should be launched in its
+ * own thread to prevent the ForkJoinPool from being tied up. Should probably
+ * provide a method to make that easy.
+ */
+
+ return operation -> CompletableFuture.supplyAsync(() -> doOperation(params, attempt, operation),
+ params.getExecutor());
+ }
+
+ /**
+ * Low-level method that performs the operation. This can make the same assumptions
+ * that are made by {@link #doOperationAsFuture(ControlLoopOperationParams)}. This
+ * method throws an {@link UnsupportedOperationException}.
+ *
+ * @param params operation parameters
+ * @param attempt attempt number, typically starting with 1
+ * @param operation the operation being performed
+ * @return the outcome of the operation
+ */
+ protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt,
+ ControlLoopOperation operation) {
+
+ throw new UnsupportedOperationException("start operation " + getFullName());
+ }
+
+ /**
+ * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
+ * FAILURE, assuming the policy specifies retries and the retry count has been
+ * exhausted.
+ *
+ * @param params operation parameters
+ * @param attempt latest attempt number, starting with 1
+ * @return a function to get the next future to execute
+ */
+ private Function<ControlLoopOperation, ControlLoopOperation> setRetryFlag(ControlLoopOperationParams params,
+ int attempt) {
+
+ return operation -> {
+ if (operation != null && !isActorFailed(operation)) {
+ /*
+ * wrong type or wrong operation - just leave it as is. No need to log
+ * anything here, as retryOnFailure() will log a message
+ */
+ return operation;
+ }
+
+ // get a non-null operation
+ ControlLoopOperation oper2;
+ if (operation != null) {
+ oper2 = operation;
+ } else {
+ oper2 = params.makeOutcome();
+ oper2.setOutcome(OUTCOME_FAILURE);
+ }
+
+ if (params.getPolicy().getRetry() != null && params.getPolicy().getRetry() > 0
+ && attempt > params.getPolicy().getRetry()) {
+ /*
+ * retries were specified and we've already tried them all - change to
+ * FAILURE_RETRIES
+ */
+ logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
+ oper2.setOutcome(OUTCOME_RETRIES);
+ }
+
+ return oper2;
+ };
+ }
+
+ /**
+ * Restarts the operation if it was a FAILURE. Assumes that
+ * {@link #setRetryFlag(ControlLoopOperationParams, int)} was previously invoked, and
+ * thus that the "operation" is not {@code null}.
+ *
+ * @param params operation parameters
+ * @param controller controller for all of the retries
+ * @param attempt latest attempt number, starting with 1
+ * @return a function to get the next future to execute
+ */
+ private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> retryOnFailure(
+ ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller,
+ int attempt) {
+
+ return operation -> {
+ if (!isActorFailed(operation)) {
+ // wrong type or wrong operation - just leave it as is
+ logger.trace("not retrying operation {} for {}", getFullName(), params.getRequestId());
+ return CompletableFuture.completedFuture(operation);
+ }
+
+ if (params.getPolicy().getRetry() == null || params.getPolicy().getRetry() <= 0) {
+ // no retries - already marked as FAILURE, so just return it
+ logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
+ return CompletableFuture.completedFuture(operation);
+ }
+
+
+ /*
+ * Retry the operation.
+ */
+ logger.info("retry operation {} for {}", getFullName(), params.getRequestId());
+
+ return startOperationAttempt(params, controller, attempt + 1);
+ };
+ }
+
+ /**
+ * Gets the outcome of an operation for this operation.
+ *
+ * @param operation operation whose outcome is to be extracted
+ * @return the outcome of the given operation, if it's for this operator, {@code null}
+ * otherwise
+ */
+ protected String getActorOutcome(ControlLoopOperation operation) {
+ if (operation == null) {
+ return null;
+ }
+
+ if (!getActorName().equals(operation.getActor())) {
+ return null;
+ }
+
+ if (!getName().equals(operation.getOperation())) {
+ return null;
+ }
+
+ return operation.getOutcome();
+ }
+
+ /**
+ * Gets a function that will start the next step, if the current operation was
+ * successful, or just return the current operation, otherwise.
+ *
+ * @param params operation parameters
+ * @param nextStep function that will invoke the next step, passing it the operation
+ * @return a function that will start the next step
+ */
+ protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> onSuccess(
+ ControlLoopOperationParams params,
+ Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> nextStep) {
+
+ return operation -> {
+
+ if (operation == null) {
+ logger.trace("{}: null outcome - discarding next task for {}", getFullName(), params.getRequestId());
+ ControlLoopOperation outcome = params.makeOutcome();
+ outcome.setOutcome(OUTCOME_FAILURE);
+ return CompletableFuture.completedFuture(outcome);
+
+ } else if (isSuccess(operation)) {
+ logger.trace("{}: success - starting next task for {}", getFullName(), params.getRequestId());
+ return nextStep.apply(operation);
+
+ } else {
+ logger.trace("{}: failure - discarding next task for {}", getFullName(), params.getRequestId());
+ return CompletableFuture.completedFuture(operation);
+ }
+ };
+ }
+
+ /**
+ * Converts an exception into an operation outcome, returning a copy of the outcome to
+ * prevent background jobs from changing it.
+ *
+ * @param params operation parameters
+ * @param operation current operation
+ * @return a function that will convert an exception into an operation outcome
+ */
+ private Function<Throwable, ControlLoopOperation> fromException(ControlLoopOperationParams params,
+ ControlLoopOperation operation) {
+
+ return thrown -> {
+ logger.warn("exception throw by operation {}.{} for {}", operation.getActor(), operation.getOperation(),
+ params.getRequestId(), thrown);
+
+ /*
+ * Must make a copy of the operation, as the original could be changed by
+ * background jobs that might still be running.
+ */
+ return setOutcome(params, new ControlLoopOperation(operation), thrown);
+ };
+ }
+
+ /**
+ * Gets a function to verify that the operation is still running. If the pipeline is
+ * not running, then it returns an incomplete future, which will effectively halt
+ * subsequent operations in the pipeline. This method is intended to be used with one
+ * of the {@link CompletableFuture}'s <i>thenCompose()</i> methods.
+ *
+ * @param controller pipeline controller
+ * @param params operation parameters
+ * @return a function to verify that the operation is still running
+ */
+ protected <T> Function<T, CompletableFuture<T>> verifyRunning(
+ PipelineControllerFuture<ControlLoopOperation> controller, ControlLoopOperationParams params) {
+
+ return value -> {
+ boolean running = controller.isRunning();
+ logger.trace("{}: verify running {} for {}", getFullName(), running, params.getRequestId());
+
+ return (running ? CompletableFuture.completedFuture(value) : new CompletableFuture<>());
+ };
+ }
+
+ /**
+ * Sets the start time of the operation and invokes the callback to indicate that the
+ * operation has started. Does nothing if the pipeline has been stopped.
+ * <p/>
+ * This assumes that the "outcome" is not {@code null}.
+ *
+ * @param params operation parameters
+ * @param callbacks used to determine if the start callback can be invoked
+ * @return a function that sets the start time and invokes the callback
+ */
+ private Function<ControlLoopOperation, ControlLoopOperation> callbackStarted(ControlLoopOperationParams params,
+ CallbackManager callbacks) {
+
+ return outcome -> {
+
+ if (callbacks.canStart()) {
+ // haven't invoked "start" callback yet
+ outcome.setStart(callbacks.getStartTime());
+ outcome.setEnd(null);
+ params.callbackStarted(outcome);
+ }
+
+ return outcome;
+ };
+ }
+
+ /**
+ * Sets the end time of the operation and invokes the callback to indicate that the
+ * operation has completed. Does nothing if the pipeline has been stopped.
+ * <p/>
+ * This assumes that the "outcome" is not {@code null}.
+ * <p/>
+ * Note: the start time must be a reference rather than a plain value, because it's
+ * value must be gotten on-demand, when the returned function is executed at a later
+ * time.
+ *
+ * @param params operation parameters
+ * @param callbacks used to determine if the end callback can be invoked
+ * @return a function that sets the end time and invokes the callback
+ */
+ private Function<ControlLoopOperation, ControlLoopOperation> callbackCompleted(ControlLoopOperationParams params,
+ CallbackManager callbacks) {
+
+ return operation -> {
+
+ if (callbacks.canEnd()) {
+ operation.setStart(callbacks.getStartTime());
+ operation.setEnd(callbacks.getEndTime());
+ params.callbackCompleted(operation);
+ }
+
+ return operation;
+ };
+ }
+
+ /**
+ * Sets an operation's outcome and message, based on a throwable.
+ *
+ * @param params operation parameters
+ * @param operation operation to be updated
+ * @return the updated operation
+ */
+ protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation,
+ Throwable thrown) {
+ PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
+ return setOutcome(params, operation, result);
+ }
+
+ /**
+ * Sets an operation's outcome and default message based on the result.
+ *
+ * @param params operation parameters
+ * @param operation operation to be updated
+ * @param result result of the operation
+ * @return the updated operation
+ */
+ protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation,
+ PolicyResult result) {
+ logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
+ operation.setOutcome(result.toString());
+ operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
+ : ControlLoopOperation.FAILED_MSG);
+
+ return operation;
+ }
+
+ /**
+ * Determines if a throwable is due to a timeout.
+ *
+ * @param thrown throwable of interest
+ * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
+ */
+ protected boolean isTimeout(Throwable thrown) {
+ if (thrown instanceof CompletionException) {
+ thrown = thrown.getCause();
+ }
+
+ return (thrown instanceof TimeoutException);
+ }
+
+ // these may be overridden by junit tests
+
+ /**
+ * Gets the operation timeout. Subclasses may override this method to obtain the
+ * timeout in some other way (e.g., through configuration properties).
+ *
+ * @param policy policy from which to extract the timeout
+ * @return the operation timeout, in milliseconds
+ */
+ protected long getTimeOutMillis(Policy policy) {
+ Integer timeoutSec = policy.getTimeout();
+ return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
+ }
+
+ /**
+ * Manager for "start" and "end" callbacks.
+ */
+ private static class CallbackManager implements Runnable {
+ private final AtomicReference<Instant> startTime = new AtomicReference<>();
+ private final AtomicReference<Instant> endTime = new AtomicReference<>();
+
+ /**
+ * Determines if the "start" callback can be invoked. If so, it sets the
+ * {@link #startTime} to the current time.
+ *
+ * @return {@code true} if the "start" callback can be invoked, {@code false}
+ * otherwise
+ */
+ public boolean canStart() {
+ return startTime.compareAndSet(null, Instant.now());
+ }
+
+ /**
+ * Determines if the "end" callback can be invoked. If so, it sets the
+ * {@link #endTime} to the current time.
+ *
+ * @return {@code true} if the "end" callback can be invoked, {@code false}
+ * otherwise
+ */
+ public boolean canEnd() {
+ return endTime.compareAndSet(null, Instant.now());
+ }
+
+ /**
+ * Gets the start time.
+ *
+ * @return the start time, or {@code null} if {@link #canStart()} has not been
+ * invoked yet.
+ */
+ public Instant getStartTime() {
+ return startTime.get();
+ }
+
+ /**
+ * Gets the end time.
+ *
+ * @return the end time, or {@code null} if {@link #canEnd()} has not been invoked
+ * yet.
+ */
+ public Instant getEndTime() {
+ return endTime.get();
+ }
+
+ /**
+ * Prevents further callbacks from being executed by setting {@link #startTime}
+ * and {@link #endTime}.
+ */
+ @Override
+ public void run() {
+ canStart();
+ canEnd();
+ }
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/StartConfigPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/StartConfigPartial.java
new file mode 100644
index 000000000..6c883f1b5
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/StartConfigPartial.java
@@ -0,0 +1,153 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import lombok.Getter;
+import org.onap.policy.common.capabilities.Configurable;
+import org.onap.policy.common.capabilities.Startable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Partial implementation of an object that is both startable and configurable. It
+ * provides the high level methods defined in the interface, while deferring the details
+ * to abstract methods that must be provided by the subclasses. It also manages the
+ * current {@link #state}.
+ *
+ * @param <T> type of parameters expected by {@link #configure(Object)}
+ */
+public abstract class StartConfigPartial<T> implements Startable, Configurable<T> {
+ private static final Logger logger = LoggerFactory.getLogger(StartConfigPartial.class);
+
+ @Getter
+ private final String fullName;
+
+ public enum State {
+ IDLE, CONFIGURED, ALIVE
+ }
+
+ private State state = State.IDLE;
+
+ /**
+ * Constructs the object.
+ *
+ * @param fullName full name of this object, used for logging and exception purposes
+ */
+ public StartConfigPartial(String fullName) {
+ this.fullName = fullName;
+ }
+
+ @Override
+ public synchronized boolean isAlive() {
+ return (state == State.ALIVE);
+ }
+
+ /**
+ * Determines if this object has been configured.
+ *
+ * @return {@code true} if this object has been configured, {@code false} otherwise
+ */
+ public synchronized boolean isConfigured() {
+ return (state != State.IDLE);
+ }
+
+ @Override
+ public synchronized void configure(T parameters) {
+ if (isAlive()) {
+ throw new IllegalStateException("attempt to reconfigure, but already running " + getFullName());
+ }
+
+ logger.info("initializing {}", getFullName());
+
+ doConfigure(parameters);
+
+ state = State.CONFIGURED;
+ }
+
+ @Override
+ public synchronized boolean start() {
+ switch (state) {
+ case ALIVE:
+ logger.info("{} is already running", getFullName());
+ break;
+
+ case CONFIGURED:
+ logger.info("starting {}", getFullName());
+ doStart();
+ state = State.ALIVE;
+ break;
+
+ case IDLE:
+ default:
+ throw new IllegalStateException("attempt to start unconfigured " + getFullName());
+ }
+
+ return true;
+ }
+
+ @Override
+ public synchronized boolean stop() {
+ if (isAlive()) {
+ logger.info("stopping {}", getFullName());
+ state = State.CONFIGURED;
+ doStop();
+
+ } else {
+ logger.info("{} is not running", getFullName());
+ }
+
+ return true;
+ }
+
+ @Override
+ public synchronized void shutdown() {
+ if (!isAlive()) {
+ logger.info("{} is not running", getFullName());
+ return;
+ }
+
+ logger.info("shutting down actor {}", getFullName());
+ state = State.CONFIGURED;
+ doShutdown();
+ }
+
+ /**
+ * Configures this object.
+ *
+ * @param parameters configuration parameters
+ */
+ protected abstract void doConfigure(T parameters);
+
+ /**
+ * Starts this object.
+ */
+ protected abstract void doStart();
+
+ /**
+ * Stops this object.
+ */
+ protected abstract void doStop();
+
+ /**
+ * Shuts down this object.
+ */
+ protected abstract void doShutdown();
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java
new file mode 100644
index 000000000..08aba81f2
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java
@@ -0,0 +1,210 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Consumer;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.onap.policy.common.parameters.BeanValidationResult;
+import org.onap.policy.common.parameters.BeanValidator;
+import org.onap.policy.common.parameters.annotations.NotNull;
+import org.onap.policy.controlloop.ControlLoopOperation;
+import org.onap.policy.controlloop.actorserviceprovider.ActorService;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
+import org.onap.policy.controlloop.policy.Policy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Parameters for control loop operations. The executor defaults to
+ * {@link ForkJoinPool#commonPool()}, but may be overridden.
+ */
+@Getter
+@Builder(toBuilder = true)
+@AllArgsConstructor
+@EqualsAndHashCode
+public class ControlLoopOperationParams {
+
+ private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationParams.class);
+
+ public static final String UNKNOWN = "-unknown-";
+
+
+ /**
+ * The actor service in which to find the actor/operation.
+ */
+ @NotNull
+ private ActorService actorService;
+
+ /**
+ * The event for which the operation applies.
+ */
+ @NotNull
+ private ControlLoopEventContext context;
+
+ /**
+ * The executor to use to run the operation.
+ */
+ @NotNull
+ @Builder.Default
+ private Executor executor = ForkJoinPool.commonPool();
+
+ /**
+ * The policy associated with the operation.
+ */
+ @NotNull
+ private Policy policy;
+
+ /**
+ * The function to invoke when the operation starts. This is optional.
+ * <p/>
+ * Note: this may be invoked multiple times, but with different actor/operations. That
+ * may happen if the current operation requires other operations to be performed first
+ * (e.g., A&AI queries, guard checks).
+ */
+ private Consumer<ControlLoopOperation> startCallback;
+
+ /**
+ * The function to invoke when the operation completes. This is optional.
+ * <p/>
+ * Note: this may be invoked multiple times, but with different actor/operations. That
+ * may happen if the current operation requires other operations to be performed first
+ * (e.g., A&AI queries, guard checks).
+ */
+ private Consumer<ControlLoopOperation> completeCallback;
+
+ /**
+ * Target entity.
+ */
+ @NotNull
+ private String target;
+
+ /**
+ * Starts the specified operation.
+ *
+ * @return a future that will return the result of the operation
+ * @throws IllegalArgumentException if the parameters are invalid
+ */
+ public CompletableFuture<ControlLoopOperation> start() {
+ BeanValidationResult result = validate();
+ if (!result.isValid()) {
+ logger.warn("parameter error in operation {}.{} for {}:\n{}", getActor(), getOperation(), getRequestId(),
+ result.getResult());
+ throw new IllegalArgumentException("invalid parameters");
+ }
+
+ // @formatter:off
+ return actorService
+ .getActor(policy.getActor())
+ .getOperator(policy.getRecipe())
+ .startOperation(this);
+ // @formatter:on
+ }
+
+ /**
+ * Gets the name of the actor from the policy.
+ *
+ * @return the actor name, or {@link #UNKNOWN} if no name is available
+ */
+ public String getActor() {
+ return (policy == null || policy.getActor() == null ? UNKNOWN : policy.getActor());
+ }
+
+ /**
+ * Gets the name of the operation from the policy.
+ *
+ * @return the operation name, or {@link #UNKNOWN} if no name is available
+ */
+ public String getOperation() {
+ return (policy == null || policy.getRecipe() == null ? UNKNOWN : policy.getRecipe());
+ }
+
+ /**
+ * Gets the requested ID of the associated event.
+ *
+ * @return the event's request ID, or {@code null} if no request ID is available
+ */
+ public UUID getRequestId() {
+ return (context == null || context.getEvent() == null ? null : context.getEvent().getRequestId());
+ }
+
+ /**
+ * Makes an operation outcome, populating it from the parameters.
+ *
+ * @return a new operation outcome
+ */
+ public ControlLoopOperation makeOutcome() {
+ ControlLoopOperation operation = new ControlLoopOperation();
+ operation.setActor(getActor());
+ operation.setOperation(getOperation());
+ operation.setTarget(target);
+
+ return operation;
+ }
+
+ /**
+ * Invokes the callback to indicate that the operation has started. Any exceptions
+ * generated by the callback are logged, but not re-thrown.
+ *
+ * @param operation the operation that is being started
+ */
+ public void callbackStarted(ControlLoopOperation operation) {
+ logger.info("started operation {}.{} for {}", operation.getActor(), operation.getOperation(), getRequestId());
+
+ if (startCallback != null) {
+ Util.logException(() -> startCallback.accept(operation), "{}.{}: start-callback threw an exception for {}",
+ operation.getActor(), operation.getOperation(), getRequestId());
+ }
+ }
+
+ /**
+ * Invokes the callback to indicate that the operation has completed. Any exceptions
+ * generated by the callback are logged, but not re-thrown.
+ *
+ * @param operation the operation that is being started
+ */
+ public void callbackCompleted(ControlLoopOperation operation) {
+ logger.info("completed operation {}.{} outcome={} for {}", operation.getActor(), operation.getOperation(),
+ operation.getOutcome(), getRequestId());
+
+ if (completeCallback != null) {
+ Util.logException(() -> completeCallback.accept(operation),
+ "{}.{}: complete-callback threw an exception for {}", operation.getActor(),
+ operation.getOperation(), getRequestId());
+ }
+ }
+
+ /**
+ * Validates the parameters.
+ *
+ * @return the validation result
+ */
+ public BeanValidationResult validate() {
+ return new BeanValidator().validateTop(ControlLoopOperationParams.class.getSimpleName(), this);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java
new file mode 100644
index 000000000..da4fb4f0c
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java
@@ -0,0 +1,112 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import java.util.Map;
+import java.util.function.Function;
+import lombok.Data;
+import org.onap.policy.common.parameters.BeanValidationResult;
+import org.onap.policy.common.parameters.BeanValidator;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.common.parameters.annotations.Min;
+import org.onap.policy.common.parameters.annotations.NotBlank;
+import org.onap.policy.common.parameters.annotations.NotNull;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+
+/**
+ * Parameters used by Actors that connect to a server via HTTP. This contains the
+ * parameters that are common to all of the operations. Only the path changes for each
+ * operation, thus it includes a mapping from operation name to path.
+ */
+@Data
+@NotNull
+@NotBlank
+public class HttpActorParams {
+
+ /**
+ * Name of the HttpClient, as found in the HttpClientFactory.
+ */
+ private String clientName;
+
+ /**
+ * Amount of time, in seconds to wait for the HTTP request to complete, where zero
+ * indicates that it should wait forever. The default is zero.
+ */
+ @Min(0)
+ private long timeoutSec = 0;
+
+ /**
+ * Maps the operation name to its URI path.
+ */
+ private Map<String, String> path;
+
+ /**
+ * Extracts a specific operation's parameters from "this".
+ *
+ * @param name name of the item containing "this"
+ * @return a function to extract an operation's parameters from "this". Note: the
+ * returned function is not thread-safe
+ */
+ public Function<String, Map<String, Object>> makeOperationParameters(String name) {
+ HttpParams subparams = HttpParams.builder().clientName(getClientName()).timeoutSec(getTimeoutSec()).build();
+
+ return operation -> {
+ String subpath = path.get(operation);
+ if (subpath == null) {
+ return null;
+ }
+
+ subparams.setPath(subpath);
+ return Util.translateToMap(name + "." + operation, subparams);
+ };
+ }
+
+ /**
+ * Validates the parameters.
+ *
+ * @param name name of the object containing these parameters
+ * @return "this"
+ * @throws IllegalArgumentException if the parameters are invalid
+ */
+ public HttpActorParams doValidation(String name) {
+ ValidationResult result = validate(name);
+ if (!result.isValid()) {
+ throw new ParameterValidationRuntimeException("invalid parameters", result);
+ }
+
+ return this;
+ }
+
+ /**
+ * Validates the parameters.
+ *
+ * @param resultName name of the result
+ *
+ * @return the validation result
+ */
+ public ValidationResult validate(String resultName) {
+ BeanValidationResult result = new BeanValidator().validateTop(resultName, this);
+
+ result.validateMap("path", path, (result2, entry) -> result2.validateNotNull(entry.getKey(), entry.getValue()));
+
+ return result;
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java
new file mode 100644
index 000000000..695ffe4dd
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java
@@ -0,0 +1,69 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import lombok.Builder;
+import lombok.Data;
+import org.onap.policy.common.parameters.BeanValidator;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.common.parameters.annotations.Min;
+import org.onap.policy.common.parameters.annotations.NotBlank;
+import org.onap.policy.common.parameters.annotations.NotNull;
+
+/**
+ * Parameters used by Operators that connect to a server via HTTP.
+ */
+@NotNull
+@NotBlank
+@Data
+@Builder(toBuilder = true)
+public class HttpParams {
+
+ /**
+ * Name of the HttpClient, as found in the HttpClientFactory.
+ */
+ private String clientName;
+
+ /**
+ * URI path.
+ */
+ private String path;
+
+ /**
+ * Amount of time, in seconds to wait for the HTTP request to complete, where zero
+ * indicates that it should wait forever. The default is zero.
+ */
+ @Min(0)
+ @Builder.Default
+ private long timeoutSec = 0;
+
+
+ /**
+ * Validates the parameters.
+ *
+ * @param resultName name of the result
+ *
+ * @return the validation result
+ */
+ public ValidationResult validate(String resultName) {
+ return new BeanValidator().validateTop(resultName, this);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ParameterValidationRuntimeException.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ParameterValidationRuntimeException.java
new file mode 100644
index 000000000..3004e1932
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ParameterValidationRuntimeException.java
@@ -0,0 +1,57 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import lombok.Getter;
+import org.onap.policy.common.parameters.ValidationResult;
+
+/**
+ * Parameter runtime exception, with an associated validation result. This is used to
+ * throw an exception while passing a validation result up the chain.
+ * <p/>
+ * Note: the validation result is <i>not</i> included in the exception message.
+ */
+public class ParameterValidationRuntimeException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ @Getter
+ private final transient ValidationResult result;
+
+
+ public ParameterValidationRuntimeException(ValidationResult result) {
+ this.result = result;
+ }
+
+ public ParameterValidationRuntimeException(String message, ValidationResult result) {
+ super(message);
+ this.result = result;
+ }
+
+ public ParameterValidationRuntimeException(Throwable cause, ValidationResult result) {
+ super(cause);
+ this.result = result;
+ }
+
+ public ParameterValidationRuntimeException(String message, Throwable cause, ValidationResult result) {
+ super(message, cause);
+ this.result = result;
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java
new file mode 100644
index 000000000..9e6d8a15e
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java
@@ -0,0 +1,68 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import lombok.Builder;
+import lombok.Data;
+import org.onap.policy.common.parameters.BeanValidator;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.common.parameters.annotations.Min;
+import org.onap.policy.common.parameters.annotations.NotBlank;
+import org.onap.policy.common.parameters.annotations.NotNull;
+
+/**
+ * Parameters used by Operators that connect to a server via DMaaP.
+ */
+@NotNull
+@NotBlank
+@Data
+@Builder(toBuilder = true)
+public class TopicParams {
+
+ /**
+ * Name of the target topic end point to which requests should be published.
+ */
+ private String target;
+
+ /**
+ * Source topic end point, from which to read responses.
+ */
+ private String source;
+
+ /**
+ * Amount of time, in seconds to wait for the response, where zero indicates that it
+ * should wait forever. The default is zero.
+ */
+ @Min(0)
+ @Builder.Default
+ private long timeoutSec = 0;
+
+ /**
+ * Validates both the publisher and the subscriber parameters.
+ *
+ * @param resultName name of the result
+ *
+ * @return the validation result
+ */
+ public ValidationResult validate(String resultName) {
+ return new BeanValidator().validateTop(resultName, this);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/FutureManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/FutureManager.java
new file mode 100644
index 000000000..aac2f77b7
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/FutureManager.java
@@ -0,0 +1,89 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.pipeline;
+
+import java.util.IdentityHashMap;
+import java.util.concurrent.Future;
+import lombok.NoArgsConstructor;
+
+/**
+ * Manager that manages both futures and listeners. When {@link #stop()} is called, the
+ * listeners are executed and the futures are canceled. The various methods synchronize on
+ * "this" while they manipulate internal data structures.
+ */
+@NoArgsConstructor
+public class FutureManager extends ListenerManager {
+
+ /**
+ * Maps a future to its listener. Records the {@link Runnable} that is passed to
+ * {@link ListenerManager#add(Runnable)} when {@link #add(Future)} is invoked. This is
+ * needed if {@link #remove(Future)} is invoked, so that the same {@link Runnable} is
+ * used each time.
+ */
+ @SuppressWarnings("rawtypes")
+ private final IdentityHashMap<Future, Runnable> future2listener = new IdentityHashMap<>(5);
+
+ /**
+ * Adds a future that is to be canceled when this controller is stopped. Note: if the
+ * controller is already stopped, then the future will be canceled immediately, within
+ * the invoking thread.
+ *
+ * @param future future to be added
+ */
+ public <T> void add(Future<T> future) {
+ Runnable listener = () -> future.cancel(false);
+
+ synchronized (this) {
+ if (future2listener.putIfAbsent(future, listener) != null) {
+ // this future is already in the map, nothing more to do
+ return;
+ }
+
+ if (addOnly(listener)) {
+ // successfully added
+ return;
+ }
+ }
+
+ runListener(listener);
+ }
+
+ /**
+ * Removes a future so that it is not canceled when this controller is stopped.
+ *
+ * @param future future to be removed
+ */
+ public synchronized <T> void remove(Future<T> future) {
+ Runnable listener = future2listener.remove(future);
+ if (listener != null) {
+ remove(listener);
+ }
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+
+ synchronized (this) {
+ future2listener.clear();
+ }
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java
new file mode 100644
index 000000000..d34a3fb5b
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java
@@ -0,0 +1,115 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.pipeline;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+
+/**
+ * Listener manager, used by operations within the pipeline to determine if they should
+ * continue to run. When {@link #stop()} is called, the listeners are executed. The
+ * various methods synchronize on "this" while they manipulate internal data structures.
+ */
+@NoArgsConstructor
+public class ListenerManager {
+
+ @Getter
+ private volatile boolean running = true;
+
+ /**
+ * Listeners to be executed when {@link #stop()} is invoked.
+ */
+ private final IdentityHashMap<Runnable, Void> listeners = new IdentityHashMap<>(5);
+
+ /**
+ * Indicates that operations within the pipeline should stop executing.
+ */
+ public void stop() {
+ ArrayList<Runnable> items;
+
+ synchronized (this) {
+ if (!running) {
+ return;
+ }
+
+ running = false;
+ items = new ArrayList<>(listeners.keySet());
+ listeners.clear();
+ }
+
+ items.forEach(this::runListener);
+ }
+
+ /**
+ * Adds a listener that is to be invoked when this controller is stopped. Note: if the
+ * controller is already stopped, then the listener will be invoked immediately,
+ * within the invoking thread.
+ *
+ * @param listener listener to be added
+ */
+ public void add(Runnable listener) {
+ if (!addOnly(listener)) {
+ runListener(listener);
+ }
+ }
+
+ /**
+ * Adds a listener that is to be invoked when this controller is stopped. Note: if the
+ * controller is already stopped, then the listener will be invoked immediately,
+ * within the invoking thread.
+ *
+ * @param listener listener to be added
+ * @return {@code true} if the the listener was added, {@code false} if it could not
+ * be added because this manager has already been stopped
+ */
+ protected boolean addOnly(Runnable listener) {
+ synchronized (this) {
+ if (running) {
+ listeners.put(listener, null);
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Runs a listener, catching any exceptions that it may throw.
+ *
+ * @param listener listener to be executed
+ */
+ protected void runListener(Runnable listener) {
+ // TODO do this asynchronously?
+ Util.logException(listener, "pipeline listener {} threw an exception", listener);
+ }
+
+ /**
+ * Removes a listener so that it is not invoked when this controller is stopped.
+ *
+ * @param listener listener to be removed
+ */
+ public synchronized void remove(Runnable listener) {
+ listeners.remove(listener);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java
new file mode 100644
index 000000000..96c8f9e05
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java
@@ -0,0 +1,157 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.pipeline;
+
+import static org.onap.policy.controlloop.actorserviceprovider.Util.ident;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import lombok.NoArgsConstructor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Pipeline controller, used by operations within the pipeline to determine if they should
+ * continue to run. If {@link #cancel(boolean)} is invoked, it automatically stops the
+ * pipeline.
+ */
+@NoArgsConstructor
+public class PipelineControllerFuture<T> extends CompletableFuture<T> {
+
+ private static final Logger logger = LoggerFactory.getLogger(PipelineControllerFuture.class);
+
+ /**
+ * Tracks items added to this controller via one of the <i>add</i> methods.
+ */
+ private final FutureManager futures = new FutureManager();
+
+
+ /**
+ * Cancels and stops the pipeline, in that order.
+ */
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ try {
+ logger.trace("{}: cancel future", ident(this));
+ return super.cancel(mayInterruptIfRunning);
+
+ } finally {
+ futures.stop();
+ }
+ }
+
+ /**
+ * Generates a function that, when invoked, will remove the given future. This is
+ * typically added onto the end of a pipeline via one of the
+ * {@link CompletableFuture#whenComplete(BiConsumer)} methods.
+ *
+ * @return a function that removes the given future
+ */
+ public <F> BiConsumer<T, Throwable> delayedRemove(Future<F> future) {
+ return (value, thrown) -> {
+ logger.trace("{}: remove future {}", ident(this), ident(future));
+ remove(future);
+ };
+ }
+
+ /**
+ * Generates a function that, when invoked, will remove the given listener. This is
+ * typically added onto the end of a pipeline via one of the
+ * {@link CompletableFuture#whenComplete(BiConsumer)} methods.
+ *
+ * @return a function that removes the given listener
+ */
+ public BiConsumer<T, Throwable> delayedRemove(Runnable listener) {
+ return (value, thrown) -> {
+ logger.trace("{}: remove listener {}", ident(this), ident(listener));
+ remove(listener);
+ };
+ }
+
+ /**
+ * Generates a function that, when invoked, will stop all pipeline listeners and
+ * complete this future. This is typically added onto the end of a pipeline via one of
+ * the {@link CompletableFuture#whenComplete(BiConsumer)} methods.
+ *
+ * @return a function that stops all pipeline listeners
+ */
+ public BiConsumer<T, Throwable> delayedComplete() {
+ return (value, thrown) -> {
+ if (thrown == null) {
+ logger.trace("{}: complete and stop future", ident(this));
+ complete(value);
+ } else {
+ logger.trace("{}: complete exceptionally and stop future", ident(this));
+ completeExceptionally(thrown);
+ }
+
+ futures.stop();
+ };
+ }
+
+ /**
+ * Adds a function whose return value is to be canceled when this controller is
+ * stopped. Note: if the controller is already stopped, then the function will
+ * <i>not</i> be executed.
+ *
+ * @param futureMaker function to be invoked in the future
+ */
+ public <F> Function<F, CompletableFuture<F>> add(Function<F, CompletableFuture<F>> futureMaker) {
+
+ return input -> {
+ if (!isRunning()) {
+ logger.trace("{}: discarded new future", ident(this));
+ return new CompletableFuture<>();
+ }
+
+ CompletableFuture<F> future = futureMaker.apply(input);
+ add(future);
+
+ return future;
+ };
+ }
+
+ public <F> void add(Future<F> future) {
+ logger.trace("{}: add future {}", ident(this), ident(future));
+ futures.add(future);
+ }
+
+ public void add(Runnable listener) {
+ logger.trace("{}: add listener {}", ident(this), ident(listener));
+ futures.add(listener);
+ }
+
+ public boolean isRunning() {
+ return futures.isRunning();
+ }
+
+ public <F> void remove(Future<F> future) {
+ logger.trace("{}: remove future {}", ident(this), ident(future));
+ futures.remove(future);
+ }
+
+ public void remove(Runnable listener) {
+ logger.trace("{}: remove listener {}", ident(this), ident(listener));
+ futures.remove(listener);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java
index 88f3c16eb..620950a3c 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* Actor
* ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018, 2020 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2019 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,9 +21,56 @@
package org.onap.policy.controlloop.actorserviceprovider.spi;
+import java.util.Collection;
+
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.onap.policy.common.capabilities.Configurable;
+import org.onap.policy.common.capabilities.Startable;
+import org.onap.policy.controlloop.actorserviceprovider.Operator;
+
+/**
+ * This is the service interface for defining an Actor used in Control Loop Operational
+ * Policies for performing actions on runtime entities.
+ *
+ * @author pameladragosh
+ *
+ */
+public interface Actor extends Startable, Configurable<Map<String,Object>> {
+
+ /**
+ * Gets the name of the actor.
+ *
+ * @return the actor name
+ */
+ String getName();
+
+ /**
+ * Gets a particular operator.
+ *
+ * @param name name of the operation of interest
+ * @return the desired operation
+ * @throws IllegalArgumentException if no operation by the given name exists
+ */
+ Operator getOperator(String name);
+
+ /**
+ * Gets the supported operations.
+ *
+ * @return the supported operations
+ */
+ public Collection<Operator> getOperators();
+
+ /**
+ * Gets the names of the supported operations.
+ *
+ * @return the names of the supported operations
+ */
+ public Set<String> getOperationNames();
+
-public interface Actor {
+ // TODO old code: remove lines down to **HERE**
String actor();
@@ -33,4 +80,5 @@ public interface Actor {
List<String> recipePayloads(String recipe);
+ // **HERE**
}