diff options
Diffstat (limited to 'models-interactions/model-actors/actorServiceProvider/src/main')
17 files changed, 2546 insertions, 26 deletions
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java index 809936146..13f09b1ad 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ActorService * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2018, 2020 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2019 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -21,25 +21,56 @@ package org.onap.policy.controlloop.actorserviceprovider; -import com.google.common.collect.ImmutableList; - -import java.util.Iterator; +import com.google.common.collect.ImmutableMap; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.ServiceLoader; - +import java.util.Set; +import org.onap.policy.common.parameters.BeanValidationResult; +import org.onap.policy.controlloop.actorserviceprovider.impl.StartConfigPartial; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; import org.onap.policy.controlloop.actorserviceprovider.spi.Actor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ActorService { - +/** + * Service that manages a set of actors. To use the service, first invoke + * {@link #configure(Map)} to configure all of the actors, and then invoke + * {@link #start()} to start all of the actors. When finished using the actor service, + * invoke {@link #stop()} or {@link #shutdown()}. + */ +public class ActorService extends StartConfigPartial<Map<String, Object>> { private static final Logger logger = LoggerFactory.getLogger(ActorService.class); - private static ActorService service; - // USed to load actors - private final ServiceLoader<Actor> loader; + private final Map<String, Actor> name2actor; + + private static class LazyHolder { + static final ActorService INSTANCE = new ActorService(); + } + + /** + * Constructs the object and loads the list of actors. + */ + protected ActorService() { + super("actors"); + + Map<String, Actor> map = new HashMap<>(); + + for (Actor newActor : loadActors()) { + map.compute(newActor.getName(), (name, existingActor) -> { + if (existingActor == null) { + return newActor; + } + + // TODO: should this throw an exception? + logger.warn("duplicate actor names for {}: {}, ignoring {}", name, + existingActor.getClass().getSimpleName(), newActor.getClass().getSimpleName()); + return existingActor; + }); + } - private ActorService() { - loader = ServiceLoader.load(Actor.class); + name2actor = ImmutableMap.copyOf(map); } /** @@ -47,27 +78,115 @@ public class ActorService { * * @return the instance */ - public static synchronized ActorService getInstance() { - if (service == null) { - service = new ActorService(); + public static ActorService getInstance() { + return LazyHolder.INSTANCE; + } + + /** + * Gets a particular actor. + * + * @param name name of the actor of interest + * @return the desired actor + * @throws IllegalArgumentException if no actor by the given name exists + */ + public Actor getActor(String name) { + Actor actor = name2actor.get(name); + if (actor == null) { + throw new IllegalArgumentException("unknown actor " + name); } - return service; + + return actor; } /** - * Get the actors. + * Gets the actors. * * @return the actors */ - public ImmutableList<Actor> actors() { - Iterator<Actor> iter = loader.iterator(); - logger.debug("returning actors"); - while (iter.hasNext()) { - if (logger.isDebugEnabled()) { - logger.debug("Got {}", iter.next().actor()); + public Collection<Actor> getActors() { + return name2actor.values(); + } + + /** + * Gets the names of the actors. + * + * @return the actor names + */ + public Set<String> getActorNames() { + return name2actor.keySet(); + } + + @Override + protected void doConfigure(Map<String, Object> parameters) { + logger.info("configuring actors"); + + BeanValidationResult valres = new BeanValidationResult("ActorService", parameters); + + for (Actor actor : name2actor.values()) { + String actorName = actor.getName(); + Map<String, Object> subparams = Util.translateToMap(actorName, parameters.get(actorName)); + + if (subparams != null) { + + try { + actor.configure(subparams); + + } catch (ParameterValidationRuntimeException e) { + logger.warn("failed to configure actor {}", actorName, e); + valres.addResult(e.getResult()); + + } catch (RuntimeException e) { + logger.warn("failed to configure actor {}", actorName, e); + } + + } else if (actor.isConfigured()) { + logger.warn("missing configuration parameters for actor {}; using previous parameters", actorName); + + } else { + logger.warn("missing configuration parameters for actor {}; actor cannot be started", actorName); } } - return ImmutableList.copyOf(loader.iterator()); + if (!valres.isValid() && logger.isWarnEnabled()) { + logger.warn("actor services validation errors:\n{}", valres.getResult()); + } + } + + @Override + protected void doStart() { + logger.info("starting actors"); + + for (Actor actor : name2actor.values()) { + if (actor.isConfigured()) { + Util.logException(actor::start, "failed to start actor {}", actor.getName()); + + } else { + logger.warn("not starting unconfigured actor {}", actor.getName()); + } + } + } + + @Override + protected void doStop() { + logger.info("stopping actors"); + name2actor.values() + .forEach(actor -> Util.logException(actor::stop, "failed to stop actor {}", actor.getName())); + } + + @Override + protected void doShutdown() { + logger.info("shutting down actors"); + + // @formatter:off + name2actor.values().forEach( + actor -> Util.logException(actor::shutdown, "failed to shutdown actor {}", actor.getName())); + + // @formatter:on + } + + // the following methods may be overridden by junit tests + + protected Iterable<Actor> loadActors() { + return ServiceLoader.load(Actor.class); } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/DelayedIdentString.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/DelayedIdentString.java new file mode 100644 index 000000000..b7a9a53ad --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/DelayedIdentString.java @@ -0,0 +1,65 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider; + +import lombok.AllArgsConstructor; + +/** + * Object whose {@link #toString()} method invokes {@link Object#toString()} on another + * object, on-demand. This assumes that the other object's method returns an object + * identifier. This is typically used to include an object's identifier in a log message. + */ +@AllArgsConstructor +public class DelayedIdentString { + /** + * String to return for null objects or null object identifiers. + */ + public static final String NULL_STRING = "null"; + + private final Object object; + + /** + * Gets the object's identifier, after stripping anything appearing before '@'. + */ + @Override + public String toString() { + if (object == null) { + return NULL_STRING; + } + + String ident = objectToString(); + if (ident == null) { + return NULL_STRING; + } + + int index = ident.indexOf('@'); + return (index > 0 ? ident.substring(index) : ident); + } + + /** + * Invokes the object's {@link Object#toString()} method. + * + * @return the output from the object's {@link Object#toString()} method + */ + protected String objectToString() { + return object.toString(); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java new file mode 100644 index 000000000..e308ee42e --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java @@ -0,0 +1,58 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.onap.policy.common.capabilities.Configurable; +import org.onap.policy.common.capabilities.Startable; +import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; + +/** + * This is the service interface for defining an Actor operation used in Control Loop + * Operational Policies for performing actions on runtime entities. + */ +public interface Operator extends Startable, Configurable<Map<String, Object>> { + + /** + * Gets the name of the associated actor. + * + * @return the name of the associated actor + */ + String getActorName(); + + /** + * Gets the name of the operation. + * + * @return the operation name + */ + String getName(); + + /** + * Called by enforcement PDP engine to start the operation. As part of the operation, + * it invokes the "start" and "complete" call-backs found within the parameters. + * + * @param params parameters needed to start the operation + * @return a future that can be used to cancel or await the result of the operation + */ + CompletableFuture<ControlLoopOperation> startOperation(ControlLoopOperationParams params); +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java new file mode 100644 index 000000000..0aba1a7fa --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java @@ -0,0 +1,114 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider; + +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.Map; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Actor utilities. + */ +public class Util { + private static final Logger logger = LoggerFactory.getLogger(Util.class); + + private Util() { + // do nothing + } + + /** + * Extracts an object's identity by invoking {@link Object#toString()} and returning + * the portion starting with "@". Extraction is done on-demand, when toString() is + * called on the result. This is typically used when logging. + * + * @param object object whose identity is to be extracted + * @return an object that will extract the source object's identity when this object's + * toString() method is called + */ + public static Object ident(Object object) { + return new DelayedIdentString(object); + } + + /** + * Runs a function and logs a message if it throws an exception. Does <i>not</i> + * re-throw the exception. + * + * @param function function to be run + * @param exceptionMessage message to log if an exception is thrown + * @param exceptionArgs arguments to be passed to the logger + */ + public static void logException(Runnable function, String exceptionMessage, Object... exceptionArgs) { + try { + function.run(); + + } catch (RuntimeException ex) { + // create a new array containing the original arguments plus the exception + Object[] allArgs = Arrays.copyOf(exceptionArgs, exceptionArgs.length + 1); + allArgs[exceptionArgs.length] = ex; + + logger.warn(exceptionMessage, allArgs); + } + } + + /** + * Translates parameters from one class to another, typically from a Map to a POJO or + * vice versa. + * + * @param identifier identifier of the actor/operation being translated; used to build + * an exception message + * @param source source object to be translated + * @param clazz target class + * @return the translated object + */ + public static <T> T translate(String identifier, Object source, Class<T> clazz) { + Coder coder = new StandardCoder(); + + try { + String json = coder.encode(source); + return coder.decode(json, clazz); + + } catch (CoderException | RuntimeException e) { + throw new IllegalArgumentException("cannot translate parameters for " + identifier, e); + } + } + + /** + * Translates parameters to a Map. + * + * @param identifier identifier of the actor/operation being translated; used to build + * an exception message + * @param source source parameters + * @return the parameters, as a Map + */ + @SuppressWarnings("unchecked") + public static Map<String, Object> translateToMap(String identifier, Object source) { + if (source == null) { + return null; + } + + return translate(identifier, source, LinkedHashMap.class); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java new file mode 100644 index 000000000..68bbe7edc --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java @@ -0,0 +1,90 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.controlloop; + +import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.aai.AaiCqResponse; +import org.onap.policy.controlloop.VirtualControlLoopEvent; + +/** + * Context associated with a control loop event. + */ +@Getter +@Setter +public class ControlLoopEventContext implements Serializable { + private static final long serialVersionUID = 1L; + + @Setter(AccessLevel.NONE) + private final VirtualControlLoopEvent event; + + private AaiCqResponse aaiCqResponse; + + // TODO may remove this if it proves not to be needed + @Getter(AccessLevel.NONE) + @Setter(AccessLevel.NONE) + private Map<String, Serializable> properties = new ConcurrentHashMap<>(); + + /** + * Constructs the object. + * + * @param event event with which this is associated + */ + public ControlLoopEventContext(VirtualControlLoopEvent event) { + this.event = event; + } + + /** + * Determines if the context contains a property. + * + * @param name name of the property of interest + * @return {@code true} if the context contains the property, {@code false} otherwise + */ + public boolean contains(String name) { + return properties.containsKey(name); + } + + /** + * Gets a property, casting it to the desired type. + * + * @param <T> desired type + * @param name name of the property whose value is to be retrieved + * @return the property's value, or {@code null} if it does not yet have a value + */ + @SuppressWarnings("unchecked") + public <T> T getProperty(String name) { + return (T) properties.get(name); + } + + /** + * Sets a property's value. + * + * @param name property name + * @param value new property value + */ + public void setProperty(String name, Serializable value) { + properties.put(name, value); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java new file mode 100644 index 000000000..9b9aa914e --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java @@ -0,0 +1,239 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import com.google.common.collect.ImmutableMap; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import org.onap.policy.common.parameters.BeanValidationResult; +import org.onap.policy.controlloop.actorserviceprovider.Operator; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; +import org.onap.policy.controlloop.actorserviceprovider.spi.Actor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of an actor. + */ +public class ActorImpl extends StartConfigPartial<Map<String, Object>> implements Actor { + private static final Logger logger = LoggerFactory.getLogger(ActorImpl.class); + + /** + * Maps a name to an operator. + */ + private Map<String, Operator> name2operator; + + /** + * Constructs the object. + * + * @param name actor name + * @param operators the operations supported by this actor + */ + public ActorImpl(String name, Operator... operators) { + super(name); + setOperators(Arrays.asList(operators)); + } + + /** + * Sets the operators supported by this actor, overriding any previous list. + * + * @param operators the operations supported by this actor + */ + protected void setOperators(List<Operator> operators) { + if (isConfigured()) { + throw new IllegalStateException("attempt to set operators on a configured actor: " + getName()); + } + + Map<String, Operator> map = new HashMap<>(); + for (Operator newOp : operators) { + map.compute(newOp.getName(), (opName, existingOp) -> { + if (existingOp == null) { + return newOp; + } + + // TODO: should this throw an exception? + logger.warn("duplicate names for actor operation {}.{}: {}, ignoring {}", getName(), opName, + existingOp.getClass().getSimpleName(), newOp.getClass().getSimpleName()); + return existingOp; + }); + } + + this.name2operator = ImmutableMap.copyOf(map); + } + + @Override + public String getName() { + return getFullName(); + } + + @Override + public Operator getOperator(String name) { + Operator operator = name2operator.get(name); + if (operator == null) { + throw new IllegalArgumentException("unknown operation " + getName() + "." + name); + } + + return operator; + } + + @Override + public Collection<Operator> getOperators() { + return name2operator.values(); + } + + @Override + public Set<String> getOperationNames() { + return name2operator.keySet(); + } + + /** + * For each operation, it looks for a set of parameters by the same name and, if + * found, configures the operation with the parameters. + */ + @Override + protected void doConfigure(Map<String, Object> parameters) { + final String actorName = getName(); + logger.info("configuring operations for actor {}", actorName); + + BeanValidationResult valres = new BeanValidationResult(actorName, parameters); + + // function that creates operator-specific parameters, given the operation name + Function<String, Map<String, Object>> opParamsMaker = makeOperatorParameters(parameters); + + for (Operator operator : name2operator.values()) { + String operName = operator.getName(); + Map<String, Object> subparams = opParamsMaker.apply(operName); + + if (subparams != null) { + + try { + operator.configure(subparams); + + } catch (ParameterValidationRuntimeException e) { + logger.warn("failed to configure operation {}.{}", actorName, operName, e); + valres.addResult(e.getResult()); + + } catch (RuntimeException e) { + logger.warn("failed to configure operation {}.{}", actorName, operName, e); + } + + } else if (operator.isConfigured()) { + logger.warn("missing configuration parameters for operation {}.{}; using previous parameters", + actorName, operName); + + } else { + logger.warn("missing configuration parameters for operation {}.{}; operation cannot be started", + actorName, operName); + } + } + } + + /** + * Extracts the operator parameters from the actor parameters, for a given operator. + * This method assumes each operation has its own set of parameters. + * + * @param actorParameters actor parameters + * @return a function to extract the operator parameters from the actor parameters. + * Note: this function may return {@code null} if there are no parameters for + * the given operation name + */ + protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) { + + return operName -> Util.translateToMap(getName() + "." + operName, actorParameters.get(operName)); + } + + /** + * Starts each operation. + */ + @Override + protected void doStart() { + final String actorName = getName(); + logger.info("starting operations for actor {}", actorName); + + for (Operator oper : name2operator.values()) { + if (oper.isConfigured()) { + Util.logException(oper::start, "failed to start operation {}.{}", actorName, oper.getName()); + + } else { + logger.warn("not starting unconfigured operation {}.{}", actorName, oper.getName()); + } + } + } + + /** + * Stops each operation. + */ + @Override + protected void doStop() { + final String actorName = getName(); + logger.info("stopping operations for actor {}", actorName); + + // @formatter:off + name2operator.values().forEach( + oper -> Util.logException(oper::stop, "failed to stop operation {}.{}", actorName, oper.getName())); + // @formatter:on + } + + /** + * Shuts down each operation. + */ + @Override + protected void doShutdown() { + final String actorName = getName(); + logger.info("shutting down operations for actor {}", actorName); + + // @formatter:off + name2operator.values().forEach(oper -> Util.logException(oper::shutdown, + "failed to shutdown operation {}.{}", actorName, oper.getName())); + // @formatter:on + } + + // TODO old code: remove lines down to **HERE** + + @Override + public String actor() { + return null; + } + + @Override + public List<String> recipes() { + return Collections.emptyList(); + } + + @Override + public List<String> recipeTargets(String recipe) { + return Collections.emptyList(); + } + + @Override + public List<String> recipePayloads(String recipe) { + return Collections.emptyList(); + } + + // **HERE** +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java new file mode 100644 index 000000000..80d8fbd04 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java @@ -0,0 +1,757 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.time.Instant; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import lombok.Getter; +import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.actorserviceprovider.Operator; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; +import org.onap.policy.controlloop.policy.Policy; +import org.onap.policy.controlloop.policy.PolicyResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Partial implementation of an operator. Subclasses can choose to simply implement + * {@link #doOperation(ControlLoopOperationParams)}, or they may choose to override + * {@link #doOperationAsFuture(ControlLoopOperationParams)}. + */ +public abstract class OperatorPartial extends StartConfigPartial<Map<String, Object>> implements Operator { + + private static final Logger logger = LoggerFactory.getLogger(OperatorPartial.class); + + private static final String OUTCOME_SUCCESS = PolicyResult.SUCCESS.toString(); + private static final String OUTCOME_FAILURE = PolicyResult.FAILURE.toString(); + private static final String OUTCOME_RETRIES = PolicyResult.FAILURE_RETRIES.toString(); + + @Getter + private final String actorName; + + @Getter + private final String name; + + /** + * Constructs the object. + * + * @param actorName name of the actor with which this operator is associated + * @param name operation name + */ + public OperatorPartial(String actorName, String name) { + super(actorName + "." + name); + this.actorName = actorName; + this.name = name; + } + + /** + * This method does nothing. + */ + @Override + protected void doConfigure(Map<String, Object> parameters) { + // do nothing + } + + /** + * This method does nothing. + */ + @Override + protected void doStart() { + // do nothing + } + + /** + * This method does nothing. + */ + @Override + protected void doStop() { + // do nothing + } + + /** + * This method does nothing. + */ + @Override + protected void doShutdown() { + // do nothing + } + + @Override + public final CompletableFuture<ControlLoopOperation> startOperation(ControlLoopOperationParams params) { + if (!isAlive()) { + throw new IllegalStateException("operation is not running: " + getFullName()); + } + + final Executor executor = params.getExecutor(); + + // allocate a controller for the entire operation + final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>(); + + CompletableFuture<ControlLoopOperation> preproc = startPreprocessor(params); + if (preproc == null) { + // no preprocessor required - just start the operation + return startOperationAttempt(params, controller, 1); + } + + // propagate "stop" to the preprocessor + controller.add(preproc); + + /* + * Do preprocessor first and then, if successful, start the operation. Note: + * operations create their own outcome, ignoring the outcome from any previous + * steps. + */ + preproc.whenCompleteAsync(controller.delayedRemove(preproc), executor) + .thenComposeAsync(handleFailure(params, controller), executor) + .thenComposeAsync(onSuccess(params, unused -> startOperationAttempt(params, controller, 1)), + executor); + + return controller; + } + + /** + * Starts an operation's preprocessor step(s). If the preprocessor fails, then it + * invokes the started and completed call-backs. + * + * @param params operation parameters + * @return a future that will return the preprocessor outcome, or {@code null} if this + * operation needs no preprocessor + */ + protected CompletableFuture<ControlLoopOperation> startPreprocessor(ControlLoopOperationParams params) { + logger.info("{}: start low-level operation preprocessor for {}", getFullName(), params.getRequestId()); + + final Executor executor = params.getExecutor(); + final ControlLoopOperation operation = params.makeOutcome(); + + final Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preproc = + doPreprocessorAsFuture(params); + if (preproc == null) { + // no preprocessor required + return null; + } + + // allocate a controller for the preprocessor steps + final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>(); + + /* + * Don't mark it complete until we've built the whole pipeline. This will prevent + * the operation from starting until after it has been successfully built (i.e., + * without generating any exceptions). + */ + final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>(); + + // @formatter:off + firstFuture + .thenComposeAsync(controller.add(preproc), executor) + .exceptionally(fromException(params, operation)) + .whenCompleteAsync(controller.delayedComplete(), executor); + // @formatter:on + + // start the pipeline + firstFuture.complete(operation); + + return controller; + } + + /** + * Handles a failure in the preprocessor pipeline. If a failure occurred, then it + * invokes the call-backs and returns a failed outcome. Otherwise, it returns the + * outcome that it received. + * + * @param params operation parameters + * @param controller pipeline controller + * @return a function that checks the outcome status and continues, if successful, or + * indicates a failure otherwise + */ + private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> handleFailure( + ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller) { + + return outcome -> { + + if (outcome != null && isSuccess(outcome)) { + logger.trace("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId()); + return CompletableFuture.completedFuture(outcome); + } + + logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId()); + + final Executor executor = params.getExecutor(); + final CallbackManager callbacks = new CallbackManager(); + + // propagate "stop" to the callbacks + controller.add(callbacks); + + final ControlLoopOperation outcome2 = params.makeOutcome(); + + // TODO need a FAILURE_MISSING_DATA (e.g., A&AI) + + outcome2.setOutcome(PolicyResult.FAILURE_GUARD.toString()); + outcome2.setMessage(outcome != null ? outcome.getMessage() : null); + + CompletableFuture.completedFuture(outcome2).thenApplyAsync(callbackStarted(params, callbacks), executor) + .thenApplyAsync(callbackCompleted(params, callbacks), executor) + .whenCompleteAsync(controller.delayedRemove(callbacks), executor) + .whenCompleteAsync(controller.delayedComplete(), executor); + + return controller; + }; + } + + /** + * Invokes the operation's preprocessor step(s) as a "future". This method simply + * returns {@code null}. + * <p/> + * This method assumes the following: + * <ul> + * <li>the operator is alive</li> + * <li>exceptions generated within the pipeline will be handled by the invoker</li> + * </ul> + * + * @param params operation parameters + * @return a function that will start the preprocessor and returns its outcome, or + * {@code null} if this operation needs no preprocessor + */ + protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture( + ControlLoopOperationParams params) { + return null; + } + + /** + * Starts the operation attempt, with no preprocessor. When all retries complete, it + * will complete the controller. + * + * @param params operation parameters + * @param controller controller for all operation attempts + * @param attempt attempt number, typically starting with 1 + * @return a future that will return the final result of all attempts + */ + private CompletableFuture<ControlLoopOperation> startOperationAttempt(ControlLoopOperationParams params, + PipelineControllerFuture<ControlLoopOperation> controller, int attempt) { + + final Executor executor = params.getExecutor(); + + CompletableFuture<ControlLoopOperation> future = startAttemptWithoutRetries(params, attempt); + + // propagate "stop" to the operation attempt + controller.add(future); + + // detach when complete + future.whenCompleteAsync(controller.delayedRemove(future), executor) + .thenComposeAsync(retryOnFailure(params, controller, attempt), params.getExecutor()) + .whenCompleteAsync(controller.delayedComplete(), executor); + + return controller; + } + + /** + * Starts the operation attempt, without doing any retries. + * + * @param params operation parameters + * @param attempt attempt number, typically starting with 1 + * @return a future that will return the result of a single operation attempt + */ + private CompletableFuture<ControlLoopOperation> startAttemptWithoutRetries(ControlLoopOperationParams params, + int attempt) { + + logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId()); + + final Executor executor = params.getExecutor(); + final ControlLoopOperation outcome = params.makeOutcome(); + final CallbackManager callbacks = new CallbackManager(); + + // this operation attempt gets its own controller + final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>(); + + // propagate "stop" to the callbacks + controller.add(callbacks); + + /* + * Don't mark it complete until we've built the whole pipeline. This will prevent + * the operation from starting until after it has been successfully built (i.e., + * without generating any exceptions). + */ + final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>(); + + // @formatter:off + CompletableFuture<ControlLoopOperation> future2 = + firstFuture.thenComposeAsync(verifyRunning(controller, params), executor) + .thenApplyAsync(callbackStarted(params, callbacks), executor) + .thenComposeAsync(controller.add(doOperationAsFuture(params, attempt)), executor); + // @formatter:on + + // handle timeouts, if specified + long timeoutMillis = getTimeOutMillis(params.getPolicy()); + if (timeoutMillis > 0) { + logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId()); + future2 = future2.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS); + } + + /* + * Note: we re-invoke callbackStarted() just to be sure the callback is invoked + * before callbackCompleted() is invoked. + * + * Note: no need to remove "callbacks" from the pipeline, as we're going to stop + * the pipeline as the last step anyway. + */ + + // @formatter:off + future2.exceptionally(fromException(params, outcome)) + .thenApplyAsync(setRetryFlag(params, attempt), executor) + .thenApplyAsync(callbackStarted(params, callbacks), executor) + .thenApplyAsync(callbackCompleted(params, callbacks), executor) + .whenCompleteAsync(controller.delayedComplete(), executor); + // @formatter:on + + // start the pipeline + firstFuture.complete(outcome); + + return controller; + } + + /** + * Determines if the outcome was successful. + * + * @param outcome outcome to examine + * @return {@code true} if the outcome was successful + */ + protected boolean isSuccess(ControlLoopOperation outcome) { + return OUTCOME_SUCCESS.equals(outcome.getOutcome()); + } + + /** + * Determines if the outcome was a failure for this operator. + * + * @param outcome outcome to examine, or {@code null} + * @return {@code true} if the outcome is not {@code null} and was a failure + * <i>and</i> was associated with this operator, {@code false} otherwise + */ + protected boolean isActorFailed(ControlLoopOperation outcome) { + return OUTCOME_FAILURE.equals(getActorOutcome(outcome)); + } + + /** + * Invokes the operation as a "future". This method simply invokes + * {@link #doOperation(ControlLoopOperationParams)} turning it into a "future". + * <p/> + * This method assumes the following: + * <ul> + * <li>the operator is alive</li> + * <li>verifyRunning() has been invoked</li> + * <li>callbackStarted() has been invoked</li> + * <li>the invoker will perform appropriate timeout checks</li> + * <li>exceptions generated within the pipeline will be handled by the invoker</li> + * </ul> + * + * @param params operation parameters + * @param attempt attempt number, typically starting with 1 + * @return a function that will start the operation and return its result when + * complete + */ + protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture( + ControlLoopOperationParams params, int attempt) { + + /* + * TODO As doOperation() may perform blocking I/O, this should be launched in its + * own thread to prevent the ForkJoinPool from being tied up. Should probably + * provide a method to make that easy. + */ + + return operation -> CompletableFuture.supplyAsync(() -> doOperation(params, attempt, operation), + params.getExecutor()); + } + + /** + * Low-level method that performs the operation. This can make the same assumptions + * that are made by {@link #doOperationAsFuture(ControlLoopOperationParams)}. This + * method throws an {@link UnsupportedOperationException}. + * + * @param params operation parameters + * @param attempt attempt number, typically starting with 1 + * @param operation the operation being performed + * @return the outcome of the operation + */ + protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt, + ControlLoopOperation operation) { + + throw new UnsupportedOperationException("start operation " + getFullName()); + } + + /** + * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is + * FAILURE, assuming the policy specifies retries and the retry count has been + * exhausted. + * + * @param params operation parameters + * @param attempt latest attempt number, starting with 1 + * @return a function to get the next future to execute + */ + private Function<ControlLoopOperation, ControlLoopOperation> setRetryFlag(ControlLoopOperationParams params, + int attempt) { + + return operation -> { + if (operation != null && !isActorFailed(operation)) { + /* + * wrong type or wrong operation - just leave it as is. No need to log + * anything here, as retryOnFailure() will log a message + */ + return operation; + } + + // get a non-null operation + ControlLoopOperation oper2; + if (operation != null) { + oper2 = operation; + } else { + oper2 = params.makeOutcome(); + oper2.setOutcome(OUTCOME_FAILURE); + } + + if (params.getPolicy().getRetry() != null && params.getPolicy().getRetry() > 0 + && attempt > params.getPolicy().getRetry()) { + /* + * retries were specified and we've already tried them all - change to + * FAILURE_RETRIES + */ + logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId()); + oper2.setOutcome(OUTCOME_RETRIES); + } + + return oper2; + }; + } + + /** + * Restarts the operation if it was a FAILURE. Assumes that + * {@link #setRetryFlag(ControlLoopOperationParams, int)} was previously invoked, and + * thus that the "operation" is not {@code null}. + * + * @param params operation parameters + * @param controller controller for all of the retries + * @param attempt latest attempt number, starting with 1 + * @return a function to get the next future to execute + */ + private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> retryOnFailure( + ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller, + int attempt) { + + return operation -> { + if (!isActorFailed(operation)) { + // wrong type or wrong operation - just leave it as is + logger.trace("not retrying operation {} for {}", getFullName(), params.getRequestId()); + return CompletableFuture.completedFuture(operation); + } + + if (params.getPolicy().getRetry() == null || params.getPolicy().getRetry() <= 0) { + // no retries - already marked as FAILURE, so just return it + logger.info("operation {} no retries for {}", getFullName(), params.getRequestId()); + return CompletableFuture.completedFuture(operation); + } + + + /* + * Retry the operation. + */ + logger.info("retry operation {} for {}", getFullName(), params.getRequestId()); + + return startOperationAttempt(params, controller, attempt + 1); + }; + } + + /** + * Gets the outcome of an operation for this operation. + * + * @param operation operation whose outcome is to be extracted + * @return the outcome of the given operation, if it's for this operator, {@code null} + * otherwise + */ + protected String getActorOutcome(ControlLoopOperation operation) { + if (operation == null) { + return null; + } + + if (!getActorName().equals(operation.getActor())) { + return null; + } + + if (!getName().equals(operation.getOperation())) { + return null; + } + + return operation.getOutcome(); + } + + /** + * Gets a function that will start the next step, if the current operation was + * successful, or just return the current operation, otherwise. + * + * @param params operation parameters + * @param nextStep function that will invoke the next step, passing it the operation + * @return a function that will start the next step + */ + protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> onSuccess( + ControlLoopOperationParams params, + Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> nextStep) { + + return operation -> { + + if (operation == null) { + logger.trace("{}: null outcome - discarding next task for {}", getFullName(), params.getRequestId()); + ControlLoopOperation outcome = params.makeOutcome(); + outcome.setOutcome(OUTCOME_FAILURE); + return CompletableFuture.completedFuture(outcome); + + } else if (isSuccess(operation)) { + logger.trace("{}: success - starting next task for {}", getFullName(), params.getRequestId()); + return nextStep.apply(operation); + + } else { + logger.trace("{}: failure - discarding next task for {}", getFullName(), params.getRequestId()); + return CompletableFuture.completedFuture(operation); + } + }; + } + + /** + * Converts an exception into an operation outcome, returning a copy of the outcome to + * prevent background jobs from changing it. + * + * @param params operation parameters + * @param operation current operation + * @return a function that will convert an exception into an operation outcome + */ + private Function<Throwable, ControlLoopOperation> fromException(ControlLoopOperationParams params, + ControlLoopOperation operation) { + + return thrown -> { + logger.warn("exception throw by operation {}.{} for {}", operation.getActor(), operation.getOperation(), + params.getRequestId(), thrown); + + /* + * Must make a copy of the operation, as the original could be changed by + * background jobs that might still be running. + */ + return setOutcome(params, new ControlLoopOperation(operation), thrown); + }; + } + + /** + * Gets a function to verify that the operation is still running. If the pipeline is + * not running, then it returns an incomplete future, which will effectively halt + * subsequent operations in the pipeline. This method is intended to be used with one + * of the {@link CompletableFuture}'s <i>thenCompose()</i> methods. + * + * @param controller pipeline controller + * @param params operation parameters + * @return a function to verify that the operation is still running + */ + protected <T> Function<T, CompletableFuture<T>> verifyRunning( + PipelineControllerFuture<ControlLoopOperation> controller, ControlLoopOperationParams params) { + + return value -> { + boolean running = controller.isRunning(); + logger.trace("{}: verify running {} for {}", getFullName(), running, params.getRequestId()); + + return (running ? CompletableFuture.completedFuture(value) : new CompletableFuture<>()); + }; + } + + /** + * Sets the start time of the operation and invokes the callback to indicate that the + * operation has started. Does nothing if the pipeline has been stopped. + * <p/> + * This assumes that the "outcome" is not {@code null}. + * + * @param params operation parameters + * @param callbacks used to determine if the start callback can be invoked + * @return a function that sets the start time and invokes the callback + */ + private Function<ControlLoopOperation, ControlLoopOperation> callbackStarted(ControlLoopOperationParams params, + CallbackManager callbacks) { + + return outcome -> { + + if (callbacks.canStart()) { + // haven't invoked "start" callback yet + outcome.setStart(callbacks.getStartTime()); + outcome.setEnd(null); + params.callbackStarted(outcome); + } + + return outcome; + }; + } + + /** + * Sets the end time of the operation and invokes the callback to indicate that the + * operation has completed. Does nothing if the pipeline has been stopped. + * <p/> + * This assumes that the "outcome" is not {@code null}. + * <p/> + * Note: the start time must be a reference rather than a plain value, because it's + * value must be gotten on-demand, when the returned function is executed at a later + * time. + * + * @param params operation parameters + * @param callbacks used to determine if the end callback can be invoked + * @return a function that sets the end time and invokes the callback + */ + private Function<ControlLoopOperation, ControlLoopOperation> callbackCompleted(ControlLoopOperationParams params, + CallbackManager callbacks) { + + return operation -> { + + if (callbacks.canEnd()) { + operation.setStart(callbacks.getStartTime()); + operation.setEnd(callbacks.getEndTime()); + params.callbackCompleted(operation); + } + + return operation; + }; + } + + /** + * Sets an operation's outcome and message, based on a throwable. + * + * @param params operation parameters + * @param operation operation to be updated + * @return the updated operation + */ + protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation, + Throwable thrown) { + PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION); + return setOutcome(params, operation, result); + } + + /** + * Sets an operation's outcome and default message based on the result. + * + * @param params operation parameters + * @param operation operation to be updated + * @param result result of the operation + * @return the updated operation + */ + protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation, + PolicyResult result) { + logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId()); + operation.setOutcome(result.toString()); + operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG + : ControlLoopOperation.FAILED_MSG); + + return operation; + } + + /** + * Determines if a throwable is due to a timeout. + * + * @param thrown throwable of interest + * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise + */ + protected boolean isTimeout(Throwable thrown) { + if (thrown instanceof CompletionException) { + thrown = thrown.getCause(); + } + + return (thrown instanceof TimeoutException); + } + + // these may be overridden by junit tests + + /** + * Gets the operation timeout. Subclasses may override this method to obtain the + * timeout in some other way (e.g., through configuration properties). + * + * @param policy policy from which to extract the timeout + * @return the operation timeout, in milliseconds + */ + protected long getTimeOutMillis(Policy policy) { + Integer timeoutSec = policy.getTimeout(); + return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS)); + } + + /** + * Manager for "start" and "end" callbacks. + */ + private static class CallbackManager implements Runnable { + private final AtomicReference<Instant> startTime = new AtomicReference<>(); + private final AtomicReference<Instant> endTime = new AtomicReference<>(); + + /** + * Determines if the "start" callback can be invoked. If so, it sets the + * {@link #startTime} to the current time. + * + * @return {@code true} if the "start" callback can be invoked, {@code false} + * otherwise + */ + public boolean canStart() { + return startTime.compareAndSet(null, Instant.now()); + } + + /** + * Determines if the "end" callback can be invoked. If so, it sets the + * {@link #endTime} to the current time. + * + * @return {@code true} if the "end" callback can be invoked, {@code false} + * otherwise + */ + public boolean canEnd() { + return endTime.compareAndSet(null, Instant.now()); + } + + /** + * Gets the start time. + * + * @return the start time, or {@code null} if {@link #canStart()} has not been + * invoked yet. + */ + public Instant getStartTime() { + return startTime.get(); + } + + /** + * Gets the end time. + * + * @return the end time, or {@code null} if {@link #canEnd()} has not been invoked + * yet. + */ + public Instant getEndTime() { + return endTime.get(); + } + + /** + * Prevents further callbacks from being executed by setting {@link #startTime} + * and {@link #endTime}. + */ + @Override + public void run() { + canStart(); + canEnd(); + } + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/StartConfigPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/StartConfigPartial.java new file mode 100644 index 000000000..6c883f1b5 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/StartConfigPartial.java @@ -0,0 +1,153 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import lombok.Getter; +import org.onap.policy.common.capabilities.Configurable; +import org.onap.policy.common.capabilities.Startable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Partial implementation of an object that is both startable and configurable. It + * provides the high level methods defined in the interface, while deferring the details + * to abstract methods that must be provided by the subclasses. It also manages the + * current {@link #state}. + * + * @param <T> type of parameters expected by {@link #configure(Object)} + */ +public abstract class StartConfigPartial<T> implements Startable, Configurable<T> { + private static final Logger logger = LoggerFactory.getLogger(StartConfigPartial.class); + + @Getter + private final String fullName; + + public enum State { + IDLE, CONFIGURED, ALIVE + } + + private State state = State.IDLE; + + /** + * Constructs the object. + * + * @param fullName full name of this object, used for logging and exception purposes + */ + public StartConfigPartial(String fullName) { + this.fullName = fullName; + } + + @Override + public synchronized boolean isAlive() { + return (state == State.ALIVE); + } + + /** + * Determines if this object has been configured. + * + * @return {@code true} if this object has been configured, {@code false} otherwise + */ + public synchronized boolean isConfigured() { + return (state != State.IDLE); + } + + @Override + public synchronized void configure(T parameters) { + if (isAlive()) { + throw new IllegalStateException("attempt to reconfigure, but already running " + getFullName()); + } + + logger.info("initializing {}", getFullName()); + + doConfigure(parameters); + + state = State.CONFIGURED; + } + + @Override + public synchronized boolean start() { + switch (state) { + case ALIVE: + logger.info("{} is already running", getFullName()); + break; + + case CONFIGURED: + logger.info("starting {}", getFullName()); + doStart(); + state = State.ALIVE; + break; + + case IDLE: + default: + throw new IllegalStateException("attempt to start unconfigured " + getFullName()); + } + + return true; + } + + @Override + public synchronized boolean stop() { + if (isAlive()) { + logger.info("stopping {}", getFullName()); + state = State.CONFIGURED; + doStop(); + + } else { + logger.info("{} is not running", getFullName()); + } + + return true; + } + + @Override + public synchronized void shutdown() { + if (!isAlive()) { + logger.info("{} is not running", getFullName()); + return; + } + + logger.info("shutting down actor {}", getFullName()); + state = State.CONFIGURED; + doShutdown(); + } + + /** + * Configures this object. + * + * @param parameters configuration parameters + */ + protected abstract void doConfigure(T parameters); + + /** + * Starts this object. + */ + protected abstract void doStart(); + + /** + * Stops this object. + */ + protected abstract void doStop(); + + /** + * Shuts down this object. + */ + protected abstract void doShutdown(); +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java new file mode 100644 index 000000000..08aba81f2 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java @@ -0,0 +1,210 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.parameters; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +import java.util.function.Consumer; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import org.onap.policy.common.parameters.BeanValidationResult; +import org.onap.policy.common.parameters.BeanValidator; +import org.onap.policy.common.parameters.annotations.NotNull; +import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.actorserviceprovider.ActorService; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext; +import org.onap.policy.controlloop.policy.Policy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Parameters for control loop operations. The executor defaults to + * {@link ForkJoinPool#commonPool()}, but may be overridden. + */ +@Getter +@Builder(toBuilder = true) +@AllArgsConstructor +@EqualsAndHashCode +public class ControlLoopOperationParams { + + private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationParams.class); + + public static final String UNKNOWN = "-unknown-"; + + + /** + * The actor service in which to find the actor/operation. + */ + @NotNull + private ActorService actorService; + + /** + * The event for which the operation applies. + */ + @NotNull + private ControlLoopEventContext context; + + /** + * The executor to use to run the operation. + */ + @NotNull + @Builder.Default + private Executor executor = ForkJoinPool.commonPool(); + + /** + * The policy associated with the operation. + */ + @NotNull + private Policy policy; + + /** + * The function to invoke when the operation starts. This is optional. + * <p/> + * Note: this may be invoked multiple times, but with different actor/operations. That + * may happen if the current operation requires other operations to be performed first + * (e.g., A&AI queries, guard checks). + */ + private Consumer<ControlLoopOperation> startCallback; + + /** + * The function to invoke when the operation completes. This is optional. + * <p/> + * Note: this may be invoked multiple times, but with different actor/operations. That + * may happen if the current operation requires other operations to be performed first + * (e.g., A&AI queries, guard checks). + */ + private Consumer<ControlLoopOperation> completeCallback; + + /** + * Target entity. + */ + @NotNull + private String target; + + /** + * Starts the specified operation. + * + * @return a future that will return the result of the operation + * @throws IllegalArgumentException if the parameters are invalid + */ + public CompletableFuture<ControlLoopOperation> start() { + BeanValidationResult result = validate(); + if (!result.isValid()) { + logger.warn("parameter error in operation {}.{} for {}:\n{}", getActor(), getOperation(), getRequestId(), + result.getResult()); + throw new IllegalArgumentException("invalid parameters"); + } + + // @formatter:off + return actorService + .getActor(policy.getActor()) + .getOperator(policy.getRecipe()) + .startOperation(this); + // @formatter:on + } + + /** + * Gets the name of the actor from the policy. + * + * @return the actor name, or {@link #UNKNOWN} if no name is available + */ + public String getActor() { + return (policy == null || policy.getActor() == null ? UNKNOWN : policy.getActor()); + } + + /** + * Gets the name of the operation from the policy. + * + * @return the operation name, or {@link #UNKNOWN} if no name is available + */ + public String getOperation() { + return (policy == null || policy.getRecipe() == null ? UNKNOWN : policy.getRecipe()); + } + + /** + * Gets the requested ID of the associated event. + * + * @return the event's request ID, or {@code null} if no request ID is available + */ + public UUID getRequestId() { + return (context == null || context.getEvent() == null ? null : context.getEvent().getRequestId()); + } + + /** + * Makes an operation outcome, populating it from the parameters. + * + * @return a new operation outcome + */ + public ControlLoopOperation makeOutcome() { + ControlLoopOperation operation = new ControlLoopOperation(); + operation.setActor(getActor()); + operation.setOperation(getOperation()); + operation.setTarget(target); + + return operation; + } + + /** + * Invokes the callback to indicate that the operation has started. Any exceptions + * generated by the callback are logged, but not re-thrown. + * + * @param operation the operation that is being started + */ + public void callbackStarted(ControlLoopOperation operation) { + logger.info("started operation {}.{} for {}", operation.getActor(), operation.getOperation(), getRequestId()); + + if (startCallback != null) { + Util.logException(() -> startCallback.accept(operation), "{}.{}: start-callback threw an exception for {}", + operation.getActor(), operation.getOperation(), getRequestId()); + } + } + + /** + * Invokes the callback to indicate that the operation has completed. Any exceptions + * generated by the callback are logged, but not re-thrown. + * + * @param operation the operation that is being started + */ + public void callbackCompleted(ControlLoopOperation operation) { + logger.info("completed operation {}.{} outcome={} for {}", operation.getActor(), operation.getOperation(), + operation.getOutcome(), getRequestId()); + + if (completeCallback != null) { + Util.logException(() -> completeCallback.accept(operation), + "{}.{}: complete-callback threw an exception for {}", operation.getActor(), + operation.getOperation(), getRequestId()); + } + } + + /** + * Validates the parameters. + * + * @return the validation result + */ + public BeanValidationResult validate() { + return new BeanValidator().validateTop(ControlLoopOperationParams.class.getSimpleName(), this); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java new file mode 100644 index 000000000..da4fb4f0c --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java @@ -0,0 +1,112 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.parameters; + +import java.util.Map; +import java.util.function.Function; +import lombok.Data; +import org.onap.policy.common.parameters.BeanValidationResult; +import org.onap.policy.common.parameters.BeanValidator; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.common.parameters.annotations.Min; +import org.onap.policy.common.parameters.annotations.NotBlank; +import org.onap.policy.common.parameters.annotations.NotNull; +import org.onap.policy.controlloop.actorserviceprovider.Util; + +/** + * Parameters used by Actors that connect to a server via HTTP. This contains the + * parameters that are common to all of the operations. Only the path changes for each + * operation, thus it includes a mapping from operation name to path. + */ +@Data +@NotNull +@NotBlank +public class HttpActorParams { + + /** + * Name of the HttpClient, as found in the HttpClientFactory. + */ + private String clientName; + + /** + * Amount of time, in seconds to wait for the HTTP request to complete, where zero + * indicates that it should wait forever. The default is zero. + */ + @Min(0) + private long timeoutSec = 0; + + /** + * Maps the operation name to its URI path. + */ + private Map<String, String> path; + + /** + * Extracts a specific operation's parameters from "this". + * + * @param name name of the item containing "this" + * @return a function to extract an operation's parameters from "this". Note: the + * returned function is not thread-safe + */ + public Function<String, Map<String, Object>> makeOperationParameters(String name) { + HttpParams subparams = HttpParams.builder().clientName(getClientName()).timeoutSec(getTimeoutSec()).build(); + + return operation -> { + String subpath = path.get(operation); + if (subpath == null) { + return null; + } + + subparams.setPath(subpath); + return Util.translateToMap(name + "." + operation, subparams); + }; + } + + /** + * Validates the parameters. + * + * @param name name of the object containing these parameters + * @return "this" + * @throws IllegalArgumentException if the parameters are invalid + */ + public HttpActorParams doValidation(String name) { + ValidationResult result = validate(name); + if (!result.isValid()) { + throw new ParameterValidationRuntimeException("invalid parameters", result); + } + + return this; + } + + /** + * Validates the parameters. + * + * @param resultName name of the result + * + * @return the validation result + */ + public ValidationResult validate(String resultName) { + BeanValidationResult result = new BeanValidator().validateTop(resultName, this); + + result.validateMap("path", path, (result2, entry) -> result2.validateNotNull(entry.getKey(), entry.getValue())); + + return result; + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java new file mode 100644 index 000000000..695ffe4dd --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java @@ -0,0 +1,69 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.parameters; + +import lombok.Builder; +import lombok.Data; +import org.onap.policy.common.parameters.BeanValidator; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.common.parameters.annotations.Min; +import org.onap.policy.common.parameters.annotations.NotBlank; +import org.onap.policy.common.parameters.annotations.NotNull; + +/** + * Parameters used by Operators that connect to a server via HTTP. + */ +@NotNull +@NotBlank +@Data +@Builder(toBuilder = true) +public class HttpParams { + + /** + * Name of the HttpClient, as found in the HttpClientFactory. + */ + private String clientName; + + /** + * URI path. + */ + private String path; + + /** + * Amount of time, in seconds to wait for the HTTP request to complete, where zero + * indicates that it should wait forever. The default is zero. + */ + @Min(0) + @Builder.Default + private long timeoutSec = 0; + + + /** + * Validates the parameters. + * + * @param resultName name of the result + * + * @return the validation result + */ + public ValidationResult validate(String resultName) { + return new BeanValidator().validateTop(resultName, this); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ParameterValidationRuntimeException.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ParameterValidationRuntimeException.java new file mode 100644 index 000000000..3004e1932 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ParameterValidationRuntimeException.java @@ -0,0 +1,57 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.parameters; + +import lombok.Getter; +import org.onap.policy.common.parameters.ValidationResult; + +/** + * Parameter runtime exception, with an associated validation result. This is used to + * throw an exception while passing a validation result up the chain. + * <p/> + * Note: the validation result is <i>not</i> included in the exception message. + */ +public class ParameterValidationRuntimeException extends RuntimeException { + private static final long serialVersionUID = 1L; + + @Getter + private final transient ValidationResult result; + + + public ParameterValidationRuntimeException(ValidationResult result) { + this.result = result; + } + + public ParameterValidationRuntimeException(String message, ValidationResult result) { + super(message); + this.result = result; + } + + public ParameterValidationRuntimeException(Throwable cause, ValidationResult result) { + super(cause); + this.result = result; + } + + public ParameterValidationRuntimeException(String message, Throwable cause, ValidationResult result) { + super(message, cause); + this.result = result; + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java new file mode 100644 index 000000000..9e6d8a15e --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java @@ -0,0 +1,68 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.parameters; + +import lombok.Builder; +import lombok.Data; +import org.onap.policy.common.parameters.BeanValidator; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.common.parameters.annotations.Min; +import org.onap.policy.common.parameters.annotations.NotBlank; +import org.onap.policy.common.parameters.annotations.NotNull; + +/** + * Parameters used by Operators that connect to a server via DMaaP. + */ +@NotNull +@NotBlank +@Data +@Builder(toBuilder = true) +public class TopicParams { + + /** + * Name of the target topic end point to which requests should be published. + */ + private String target; + + /** + * Source topic end point, from which to read responses. + */ + private String source; + + /** + * Amount of time, in seconds to wait for the response, where zero indicates that it + * should wait forever. The default is zero. + */ + @Min(0) + @Builder.Default + private long timeoutSec = 0; + + /** + * Validates both the publisher and the subscriber parameters. + * + * @param resultName name of the result + * + * @return the validation result + */ + public ValidationResult validate(String resultName) { + return new BeanValidator().validateTop(resultName, this); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/FutureManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/FutureManager.java new file mode 100644 index 000000000..aac2f77b7 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/FutureManager.java @@ -0,0 +1,89 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.pipeline; + +import java.util.IdentityHashMap; +import java.util.concurrent.Future; +import lombok.NoArgsConstructor; + +/** + * Manager that manages both futures and listeners. When {@link #stop()} is called, the + * listeners are executed and the futures are canceled. The various methods synchronize on + * "this" while they manipulate internal data structures. + */ +@NoArgsConstructor +public class FutureManager extends ListenerManager { + + /** + * Maps a future to its listener. Records the {@link Runnable} that is passed to + * {@link ListenerManager#add(Runnable)} when {@link #add(Future)} is invoked. This is + * needed if {@link #remove(Future)} is invoked, so that the same {@link Runnable} is + * used each time. + */ + @SuppressWarnings("rawtypes") + private final IdentityHashMap<Future, Runnable> future2listener = new IdentityHashMap<>(5); + + /** + * Adds a future that is to be canceled when this controller is stopped. Note: if the + * controller is already stopped, then the future will be canceled immediately, within + * the invoking thread. + * + * @param future future to be added + */ + public <T> void add(Future<T> future) { + Runnable listener = () -> future.cancel(false); + + synchronized (this) { + if (future2listener.putIfAbsent(future, listener) != null) { + // this future is already in the map, nothing more to do + return; + } + + if (addOnly(listener)) { + // successfully added + return; + } + } + + runListener(listener); + } + + /** + * Removes a future so that it is not canceled when this controller is stopped. + * + * @param future future to be removed + */ + public synchronized <T> void remove(Future<T> future) { + Runnable listener = future2listener.remove(future); + if (listener != null) { + remove(listener); + } + } + + @Override + public void stop() { + super.stop(); + + synchronized (this) { + future2listener.clear(); + } + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java new file mode 100644 index 000000000..d34a3fb5b --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java @@ -0,0 +1,115 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.pipeline; + +import java.util.ArrayList; +import java.util.IdentityHashMap; +import lombok.Getter; +import lombok.NoArgsConstructor; +import org.onap.policy.controlloop.actorserviceprovider.Util; + +/** + * Listener manager, used by operations within the pipeline to determine if they should + * continue to run. When {@link #stop()} is called, the listeners are executed. The + * various methods synchronize on "this" while they manipulate internal data structures. + */ +@NoArgsConstructor +public class ListenerManager { + + @Getter + private volatile boolean running = true; + + /** + * Listeners to be executed when {@link #stop()} is invoked. + */ + private final IdentityHashMap<Runnable, Void> listeners = new IdentityHashMap<>(5); + + /** + * Indicates that operations within the pipeline should stop executing. + */ + public void stop() { + ArrayList<Runnable> items; + + synchronized (this) { + if (!running) { + return; + } + + running = false; + items = new ArrayList<>(listeners.keySet()); + listeners.clear(); + } + + items.forEach(this::runListener); + } + + /** + * Adds a listener that is to be invoked when this controller is stopped. Note: if the + * controller is already stopped, then the listener will be invoked immediately, + * within the invoking thread. + * + * @param listener listener to be added + */ + public void add(Runnable listener) { + if (!addOnly(listener)) { + runListener(listener); + } + } + + /** + * Adds a listener that is to be invoked when this controller is stopped. Note: if the + * controller is already stopped, then the listener will be invoked immediately, + * within the invoking thread. + * + * @param listener listener to be added + * @return {@code true} if the the listener was added, {@code false} if it could not + * be added because this manager has already been stopped + */ + protected boolean addOnly(Runnable listener) { + synchronized (this) { + if (running) { + listeners.put(listener, null); + return true; + } + } + + return false; + } + + /** + * Runs a listener, catching any exceptions that it may throw. + * + * @param listener listener to be executed + */ + protected void runListener(Runnable listener) { + // TODO do this asynchronously? + Util.logException(listener, "pipeline listener {} threw an exception", listener); + } + + /** + * Removes a listener so that it is not invoked when this controller is stopped. + * + * @param listener listener to be removed + */ + public synchronized void remove(Runnable listener) { + listeners.remove(listener); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java new file mode 100644 index 000000000..96c8f9e05 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java @@ -0,0 +1,157 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.pipeline; + +import static org.onap.policy.controlloop.actorserviceprovider.Util.ident; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; +import java.util.function.Function; +import lombok.NoArgsConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Pipeline controller, used by operations within the pipeline to determine if they should + * continue to run. If {@link #cancel(boolean)} is invoked, it automatically stops the + * pipeline. + */ +@NoArgsConstructor +public class PipelineControllerFuture<T> extends CompletableFuture<T> { + + private static final Logger logger = LoggerFactory.getLogger(PipelineControllerFuture.class); + + /** + * Tracks items added to this controller via one of the <i>add</i> methods. + */ + private final FutureManager futures = new FutureManager(); + + + /** + * Cancels and stops the pipeline, in that order. + */ + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + try { + logger.trace("{}: cancel future", ident(this)); + return super.cancel(mayInterruptIfRunning); + + } finally { + futures.stop(); + } + } + + /** + * Generates a function that, when invoked, will remove the given future. This is + * typically added onto the end of a pipeline via one of the + * {@link CompletableFuture#whenComplete(BiConsumer)} methods. + * + * @return a function that removes the given future + */ + public <F> BiConsumer<T, Throwable> delayedRemove(Future<F> future) { + return (value, thrown) -> { + logger.trace("{}: remove future {}", ident(this), ident(future)); + remove(future); + }; + } + + /** + * Generates a function that, when invoked, will remove the given listener. This is + * typically added onto the end of a pipeline via one of the + * {@link CompletableFuture#whenComplete(BiConsumer)} methods. + * + * @return a function that removes the given listener + */ + public BiConsumer<T, Throwable> delayedRemove(Runnable listener) { + return (value, thrown) -> { + logger.trace("{}: remove listener {}", ident(this), ident(listener)); + remove(listener); + }; + } + + /** + * Generates a function that, when invoked, will stop all pipeline listeners and + * complete this future. This is typically added onto the end of a pipeline via one of + * the {@link CompletableFuture#whenComplete(BiConsumer)} methods. + * + * @return a function that stops all pipeline listeners + */ + public BiConsumer<T, Throwable> delayedComplete() { + return (value, thrown) -> { + if (thrown == null) { + logger.trace("{}: complete and stop future", ident(this)); + complete(value); + } else { + logger.trace("{}: complete exceptionally and stop future", ident(this)); + completeExceptionally(thrown); + } + + futures.stop(); + }; + } + + /** + * Adds a function whose return value is to be canceled when this controller is + * stopped. Note: if the controller is already stopped, then the function will + * <i>not</i> be executed. + * + * @param futureMaker function to be invoked in the future + */ + public <F> Function<F, CompletableFuture<F>> add(Function<F, CompletableFuture<F>> futureMaker) { + + return input -> { + if (!isRunning()) { + logger.trace("{}: discarded new future", ident(this)); + return new CompletableFuture<>(); + } + + CompletableFuture<F> future = futureMaker.apply(input); + add(future); + + return future; + }; + } + + public <F> void add(Future<F> future) { + logger.trace("{}: add future {}", ident(this), ident(future)); + futures.add(future); + } + + public void add(Runnable listener) { + logger.trace("{}: add listener {}", ident(this), ident(listener)); + futures.add(listener); + } + + public boolean isRunning() { + return futures.isRunning(); + } + + public <F> void remove(Future<F> future) { + logger.trace("{}: remove future {}", ident(this), ident(future)); + futures.remove(future); + } + + public void remove(Runnable listener) { + logger.trace("{}: remove listener {}", ident(this), ident(listener)); + futures.remove(listener); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java index 88f3c16eb..620950a3c 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * Actor * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2018, 2020 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2019 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -21,9 +21,56 @@ package org.onap.policy.controlloop.actorserviceprovider.spi; +import java.util.Collection; + import java.util.List; +import java.util.Map; +import java.util.Set; +import org.onap.policy.common.capabilities.Configurable; +import org.onap.policy.common.capabilities.Startable; +import org.onap.policy.controlloop.actorserviceprovider.Operator; + +/** + * This is the service interface for defining an Actor used in Control Loop Operational + * Policies for performing actions on runtime entities. + * + * @author pameladragosh + * + */ +public interface Actor extends Startable, Configurable<Map<String,Object>> { + + /** + * Gets the name of the actor. + * + * @return the actor name + */ + String getName(); + + /** + * Gets a particular operator. + * + * @param name name of the operation of interest + * @return the desired operation + * @throws IllegalArgumentException if no operation by the given name exists + */ + Operator getOperator(String name); + + /** + * Gets the supported operations. + * + * @return the supported operations + */ + public Collection<Operator> getOperators(); + + /** + * Gets the names of the supported operations. + * + * @return the names of the supported operations + */ + public Set<String> getOperationNames(); + -public interface Actor { + // TODO old code: remove lines down to **HERE** String actor(); @@ -33,4 +80,5 @@ public interface Actor { List<String> recipePayloads(String recipe); + // **HERE** } |