From e06578535f6afadac715c04ed03c74c05a075780 Mon Sep 17 00:00:00 2001 From: Jim Hahn Date: Thu, 6 Feb 2020 21:48:12 -0500 Subject: Clean up and enhancement of Actor re-design Added junits for the remaining code. Enhancements to facilitate implementation of Operators: - Added allOf(), anyOf() facilities - Added AsyncResponseHandler for handling asynchronous I/O via the HttpClient - Added logRestRequest() and logRestResponse() for logging REST requests and responses - Added HttpActor and HttpOperator, which can be used as superclasses - Added doTask() - Lifted data from the event into ControlLoopEventContext Updates per previous review comments: - Changed logException() to runFunction(). - Removed the aaiCqResponse field. - Lifted fields from Policy into ControlLoopOperationParams, eliminating the need to include Policy in the class. OperatorPartial depends on the string values in the ControlLoopOperation being set to one of the string values of PolicyResult. Instead of passing ControlLoopOperation around, the operators should pass around an object that uses PolicyResult directly, rather than depending on the string values being set correctly. Created OperationOutcome for this purpose. Stop pipeline when the controller completes. Use whenComplete() where appropriate. startOperationAsync() should not block. Modified it to launch the task in the background via its own thread. Extracted CallbackManager into its own file. Replaced actor setOperators() with addOperator() Renamed add() to wrap(), and modified it to remove the future when it completes. Fixed the signature on delayedRemove() and delayedComplete(). Replaced xxxAsync() calls with just xxx() calls, where appropriate to avoid the extra overhead of submitting it to a work queue. Renamed handleFailure() to handlePreprocessorFailure(). Updates per WIP review comments Issue-ID: POLICY-1625 Signed-off-by: Jim Hahn Change-Id: Id4c4c7ade979bdb76cc54266837609cc69a22c58 --- .../actorserviceprovider/ActorService.java | 7 +- .../actorserviceprovider/AsyncResponseHandler.java | 119 ++++ .../actorserviceprovider/CallbackManager.java | 84 +++ .../actorserviceprovider/OperationOutcome.java | 116 ++++ .../controlloop/actorserviceprovider/Operator.java | 3 +- .../controlloop/actorserviceprovider/Util.java | 81 ++- .../controlloop/ControlLoopEventContext.java | 23 +- .../actorserviceprovider/impl/ActorImpl.java | 50 +- .../actorserviceprovider/impl/HttpActor.java | 58 ++ .../actorserviceprovider/impl/HttpOperator.java | 84 +++ .../actorserviceprovider/impl/OperatorPartial.java | 640 ++++++++++++--------- .../parameters/ControlLoopOperationParams.java | 108 ++-- .../pipeline/ListenerManager.java | 2 +- .../pipeline/PipelineControllerFuture.java | 119 +++- 14 files changed, 1100 insertions(+), 394 deletions(-) create mode 100644 models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java create mode 100644 models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManager.java create mode 100644 models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcome.java create mode 100644 models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActor.java create mode 100644 models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java (limited to 'models-interactions/model-actors/actorServiceProvider/src/main') diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java index 13f09b1ad..2886b1feb 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java @@ -63,7 +63,6 @@ public class ActorService extends StartConfigPartial> { return newActor; } - // TODO: should this throw an exception? logger.warn("duplicate actor names for {}: {}, ignoring {}", name, existingActor.getClass().getSimpleName(), newActor.getClass().getSimpleName()); return existingActor; @@ -158,7 +157,7 @@ public class ActorService extends StartConfigPartial> { for (Actor actor : name2actor.values()) { if (actor.isConfigured()) { - Util.logException(actor::start, "failed to start actor {}", actor.getName()); + Util.runFunction(actor::start, "failed to start actor {}", actor.getName()); } else { logger.warn("not starting unconfigured actor {}", actor.getName()); @@ -170,7 +169,7 @@ public class ActorService extends StartConfigPartial> { protected void doStop() { logger.info("stopping actors"); name2actor.values() - .forEach(actor -> Util.logException(actor::stop, "failed to stop actor {}", actor.getName())); + .forEach(actor -> Util.runFunction(actor::stop, "failed to stop actor {}", actor.getName())); } @Override @@ -179,7 +178,7 @@ public class ActorService extends StartConfigPartial> { // @formatter:off name2actor.values().forEach( - actor -> Util.logException(actor::shutdown, "failed to shutdown actor {}", actor.getName())); + actor -> Util.runFunction(actor::shutdown, "failed to shutdown actor {}", actor.getName())); // @formatter:on } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java new file mode 100644 index 000000000..d78403809 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java @@ -0,0 +1,119 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import javax.ws.rs.client.InvocationCallback; +import lombok.AccessLevel; +import lombok.Getter; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handler for a single asynchronous response. + * + * @param response type + */ +@Getter +public abstract class AsyncResponseHandler implements InvocationCallback { + + private static final Logger logger = LoggerFactory.getLogger(AsyncResponseHandler.class); + + @Getter(AccessLevel.NONE) + private final PipelineControllerFuture result = new PipelineControllerFuture<>(); + private final ControlLoopOperationParams params; + private final OperationOutcome outcome; + + /** + * Constructs the object. + * + * @param params operation parameters + * @param outcome outcome to be populated based on the response + */ + public AsyncResponseHandler(ControlLoopOperationParams params, OperationOutcome outcome) { + this.params = params; + this.outcome = outcome; + } + + /** + * Handles the given future, arranging to cancel it when the response is received. + * + * @param future future to be handled + * @return a future to be used to cancel or wait for the response + */ + public CompletableFuture handle(Future future) { + result.add(future); + return result; + } + + /** + * Invokes {@link #doComplete()} and then completes "this" with the returned value. + */ + @Override + public void completed(T rawResponse) { + try { + logger.trace("{}.{}: response completed for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + result.complete(doComplete(rawResponse)); + + } catch (RuntimeException e) { + logger.trace("{}.{}: response handler threw an exception for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + result.completeExceptionally(e); + } + } + + /** + * Invokes {@link #doFailed()} and then completes "this" with the returned value. + */ + @Override + public void failed(Throwable throwable) { + try { + logger.trace("{}.{}: response failure for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + result.complete(doFailed(throwable)); + + } catch (RuntimeException e) { + logger.trace("{}.{}: response failure handler threw an exception for {}", params.getActor(), + params.getOperation(), params.getRequestId()); + result.completeExceptionally(e); + } + } + + /** + * Completes the processing of a response. + * + * @param rawResponse raw response that was received + * @return the outcome + */ + protected abstract OperationOutcome doComplete(T rawResponse); + + /** + * Handles a response exception. + * + * @param thrown exception that was thrown + * @return the outcome + */ + protected abstract OperationOutcome doFailed(Throwable thrown); +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManager.java new file mode 100644 index 000000000..7d7c1d902 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManager.java @@ -0,0 +1,84 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider; + +import java.time.Instant; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Manager for "start" and "end" callbacks. + */ +public class CallbackManager implements Runnable { + private final AtomicReference startTime = new AtomicReference<>(); + private final AtomicReference endTime = new AtomicReference<>(); + + /** + * Determines if the "start" callback can be invoked. If so, it sets the + * {@link #startTime} to the current time. + * + * @return {@code true} if the "start" callback can be invoked, {@code false} + * otherwise + */ + public boolean canStart() { + return startTime.compareAndSet(null, Instant.now()); + } + + /** + * Determines if the "end" callback can be invoked. If so, it sets the + * {@link #endTime} to the current time. + * + * @return {@code true} if the "end" callback can be invoked, {@code false} + * otherwise + */ + public boolean canEnd() { + return endTime.compareAndSet(null, Instant.now()); + } + + /** + * Gets the start time. + * + * @return the start time, or {@code null} if {@link #canStart()} has not been + * invoked yet. + */ + public Instant getStartTime() { + return startTime.get(); + } + + /** + * Gets the end time. + * + * @return the end time, or {@code null} if {@link #canEnd()} has not been invoked + * yet. + */ + public Instant getEndTime() { + return endTime.get(); + } + + /** + * Prevents further callbacks from being executed by setting {@link #startTime} + * and {@link #endTime}. + */ + @Override + public void run() { + canStart(); + canEnd(); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcome.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcome.java new file mode 100644 index 000000000..6b0924807 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcome.java @@ -0,0 +1,116 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider; + +import java.time.Instant; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.policy.PolicyResult; + +/** + * Outcome from an operation. Objects of this type are passed from one stage to the next. + */ +@Data +@NoArgsConstructor +public class OperationOutcome { + private String actor; + private String operation; + private String target; + private Instant start; + private Instant end; + private String subRequestId; + private PolicyResult result = PolicyResult.SUCCESS; + private String message; + + /** + * Copy constructor. + * + * @param source source object from which to copy + */ + public OperationOutcome(OperationOutcome source) { + this.actor = source.actor; + this.operation = source.operation; + this.target = source.target; + this.start = source.start; + this.end = source.end; + this.subRequestId = source.subRequestId; + this.result = source.result; + this.message = source.message; + } + + /** + * Creates a {@link ControlLoopOperation}, populating all fields with the values from + * this object. Sets the outcome field to the string representation of this object's + * outcome. + * + * @return + */ + public ControlLoopOperation toControlLoopOperation() { + ControlLoopOperation clo = new ControlLoopOperation(); + + clo.setActor(actor); + clo.setOperation(operation); + clo.setTarget(target); + clo.setStart(start); + clo.setEnd(end); + clo.setSubRequestId(subRequestId); + clo.setOutcome(result.toString()); + clo.setMessage(message); + + return clo; + } + + /** + * Determines if this outcome is for the given actor and operation. + * + * @param actor actor name + * @param operation operation name + * @return {@code true} if this outcome is for the given actor and operation + */ + public boolean isFor(@NonNull String actor, @NonNull String operation) { + // do the operation check first, as it's most likely to be unique + return (operation.equals(this.operation) && actor.equals(this.actor)); + } + + /** + * Determines if an outcome is for the given actor and operation. + * + * @param outcome outcome to be examined, or {@code null} + * @param actor actor name + * @param operation operation name + * @return {@code true} if this outcome is for the given actor and operation, + * {@code false} it is {@code null} or not for the actor/operation + */ + public static boolean isFor(OperationOutcome outcome, String actor, String operation) { + return (outcome != null && outcome.isFor(actor, operation)); + } + + /** + * Sets the result. + * + * @param result new result + */ + public void setResult(@NonNull PolicyResult result) { + this.result = result; + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java index e308ee42e..c09460e34 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import org.onap.policy.common.capabilities.Configurable; import org.onap.policy.common.capabilities.Startable; -import org.onap.policy.controlloop.ControlLoopOperation; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; /** @@ -54,5 +53,5 @@ public interface Operator extends Startable, Configurable> { * @param params parameters needed to start the operation * @return a future that can be used to cancel or await the result of the operation */ - CompletableFuture startOperation(ControlLoopOperationParams params); + CompletableFuture startOperation(ControlLoopOperationParams params); } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java index 0aba1a7fa..c3ddd17f3 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java @@ -23,6 +23,9 @@ package org.onap.policy.controlloop.actorserviceprovider; import java.util.Arrays; import java.util.LinkedHashMap; import java.util.Map; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; import org.onap.policy.common.utils.coder.Coder; import org.onap.policy.common.utils.coder.CoderException; import org.onap.policy.common.utils.coder.StandardCoder; @@ -52,6 +55,82 @@ public class Util { return new DelayedIdentString(object); } + /** + * Logs a REST request. If the request is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param url request URL + * @param request request to be logged + */ + public static void logRestRequest(String url, T request) { + logRestRequest(new StandardCoder(), url, request); + } + + /** + * Logs a REST request. If the request is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param coder coder to be used to pretty-print the request + * @param url request URL + * @param request request to be logged + */ + protected static void logRestRequest(Coder coder, String url, T request) { + String json; + try { + if (request instanceof String) { + json = request.toString(); + } else { + json = coder.encode(request, true); + } + + } catch (CoderException e) { + logger.warn("cannot pretty-print request", e); + json = request.toString(); + } + + NetLoggerUtil.log(EventType.OUT, CommInfrastructure.REST, url, json); + logger.info("[OUT|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json); + } + + /** + * Logs a REST response. If the response is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param url request URL + * @param response response to be logged + */ + public static void logRestResponse(String url, T response) { + logRestResponse(new StandardCoder(), url, response); + } + + /** + * Logs a REST response. If the request is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param coder coder to be used to pretty-print the response + * @param url request URL + * @param response response to be logged + */ + protected static void logRestResponse(Coder coder, String url, T response) { + String json; + try { + if (response == null) { + json = null; + } else if (response instanceof String) { + json = response.toString(); + } else { + json = coder.encode(response, true); + } + + } catch (CoderException e) { + logger.warn("cannot pretty-print response", e); + json = response.toString(); + } + + NetLoggerUtil.log(EventType.IN, CommInfrastructure.REST, url, json); + logger.info("[IN|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json); + } + /** * Runs a function and logs a message if it throws an exception. Does not * re-throw the exception. @@ -60,7 +139,7 @@ public class Util { * @param exceptionMessage message to log if an exception is thrown * @param exceptionArgs arguments to be passed to the logger */ - public static void logException(Runnable function, String exceptionMessage, Object... exceptionArgs) { + public static void runFunction(Runnable function, String exceptionMessage, Object... exceptionArgs) { try { function.run(); diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java index 68bbe7edc..cd4d2570f 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java @@ -22,11 +22,12 @@ package org.onap.policy.controlloop.actorserviceprovider.controlloop; import java.io.Serializable; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import lombok.AccessLevel; import lombok.Getter; +import lombok.NonNull; import lombok.Setter; -import org.onap.policy.aai.AaiCqResponse; import org.onap.policy.controlloop.VirtualControlLoopEvent; /** @@ -37,23 +38,35 @@ import org.onap.policy.controlloop.VirtualControlLoopEvent; public class ControlLoopEventContext implements Serializable { private static final long serialVersionUID = 1L; - @Setter(AccessLevel.NONE) + private final VirtualControlLoopEvent event; - private AaiCqResponse aaiCqResponse; + /** + * Enrichment data extracted from the event. Never {@code null}, though it may be + * immutable. + */ + private final Map enrichment; - // TODO may remove this if it proves not to be needed @Getter(AccessLevel.NONE) @Setter(AccessLevel.NONE) private Map properties = new ConcurrentHashMap<>(); + /** + * Request ID extracted from the event, or a generated value if the event has no + * request id; never {@code null}. + */ + private final UUID requestId; + + /** * Constructs the object. * * @param event event with which this is associated */ - public ControlLoopEventContext(VirtualControlLoopEvent event) { + public ControlLoopEventContext(@NonNull VirtualControlLoopEvent event) { this.event = event; + this.requestId = (event.getRequestId() != null ? event.getRequestId() : UUID.randomUUID()); + this.enrichment = (event.getAai() != null ? event.getAai() : Map.of()); } /** diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java index 9b9aa914e..d7f322e8a 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java @@ -20,14 +20,12 @@ package org.onap.policy.controlloop.actorserviceprovider.impl; -import com.google.common.collect.ImmutableMap; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import org.onap.policy.common.parameters.BeanValidationResult; import org.onap.policy.controlloop.actorserviceprovider.Operator; @@ -46,44 +44,42 @@ public class ActorImpl extends StartConfigPartial> implement /** * Maps a name to an operator. */ - private Map name2operator; + private final Map name2operator = new ConcurrentHashMap<>(); /** * Constructs the object. * * @param name actor name - * @param operators the operations supported by this actor */ - public ActorImpl(String name, Operator... operators) { + public ActorImpl(String name) { super(name); - setOperators(Arrays.asList(operators)); } /** - * Sets the operators supported by this actor, overriding any previous list. + * Adds an operator supported by this actor. * - * @param operators the operations supported by this actor + * @param operator operation to be added */ - protected void setOperators(List operators) { + protected synchronized void addOperator(Operator operator) { + /* + * This method is "synchronized" to prevent the state from changing while the + * operator is added. The map, itself, does not need synchronization as it's a + * concurrent map. + */ + if (isConfigured()) { throw new IllegalStateException("attempt to set operators on a configured actor: " + getName()); } - Map map = new HashMap<>(); - for (Operator newOp : operators) { - map.compute(newOp.getName(), (opName, existingOp) -> { - if (existingOp == null) { - return newOp; - } - - // TODO: should this throw an exception? - logger.warn("duplicate names for actor operation {}.{}: {}, ignoring {}", getName(), opName, - existingOp.getClass().getSimpleName(), newOp.getClass().getSimpleName()); - return existingOp; - }); - } + name2operator.compute(operator.getName(), (opName, existingOp) -> { + if (existingOp == null) { + return operator; + } - this.name2operator = ImmutableMap.copyOf(map); + logger.warn("duplicate names for actor operation {}.{}: {}, ignoring {}", getName(), opName, + existingOp.getClass().getSimpleName(), operator.getClass().getSimpleName()); + return existingOp; + }); } @Override @@ -177,7 +173,7 @@ public class ActorImpl extends StartConfigPartial> implement for (Operator oper : name2operator.values()) { if (oper.isConfigured()) { - Util.logException(oper::start, "failed to start operation {}.{}", actorName, oper.getName()); + Util.runFunction(oper::start, "failed to start operation {}.{}", actorName, oper.getName()); } else { logger.warn("not starting unconfigured operation {}.{}", actorName, oper.getName()); @@ -195,7 +191,7 @@ public class ActorImpl extends StartConfigPartial> implement // @formatter:off name2operator.values().forEach( - oper -> Util.logException(oper::stop, "failed to stop operation {}.{}", actorName, oper.getName())); + oper -> Util.runFunction(oper::stop, "failed to stop operation {}.{}", actorName, oper.getName())); // @formatter:on } @@ -208,7 +204,7 @@ public class ActorImpl extends StartConfigPartial> implement logger.info("shutting down operations for actor {}", actorName); // @formatter:off - name2operator.values().forEach(oper -> Util.logException(oper::shutdown, + name2operator.values().forEach(oper -> Util.runFunction(oper::shutdown, "failed to shutdown operation {}.{}", actorName, oper.getName())); // @formatter:on } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActor.java new file mode 100644 index 000000000..28b7b3924 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActor.java @@ -0,0 +1,58 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.util.Map; +import java.util.function.Function; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpActorParams; + +/** + * Actor that uses HTTP, where the only additional property that an operator needs is a + * URL. The actor's parameters must be an {@link HttpActorParams} and its operator + * parameters are expected to be an {@link HttpParams}. + */ +public class HttpActor extends ActorImpl { + + /** + * Constructs the object. + * + * @param name actor's name + */ + public HttpActor(String name) { + super(name); + } + + /** + * Translates the parameters to an {@link HttpActorParams} and then creates a function + * that will extract operator-specific parameters. + */ + @Override + protected Function> makeOperatorParameters(Map actorParameters) { + String actorName = getName(); + + // @formatter:off + return Util.translate(actorName, actorParameters, HttpActorParams.class) + .doValidation(actorName) + .makeOperationParameters(actorName); + // @formatter:on + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java new file mode 100644 index 000000000..566492907 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java @@ -0,0 +1,84 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.util.Map; +import lombok.AccessLevel; +import lombok.Getter; +import org.onap.policy.common.endpoints.http.client.HttpClient; +import org.onap.policy.common.endpoints.http.client.HttpClientFactory; +import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; + +/** + * Operator that uses HTTP. The operator's parameters must be a {@link HttpParams}. + */ +public class HttpOperator extends OperatorPartial { + + @Getter(AccessLevel.PROTECTED) + private HttpClient client; + + @Getter + private long timeoutSec; + + /** + * URI path for this particular operation. + */ + @Getter + private String path; + + + /** + * Constructs the object. + * + * @param actorName name of the actor with which this operator is associated + * @param name operation name + */ + public HttpOperator(String actorName, String name) { + super(actorName, name); + } + + /** + * Translates the parameters to an {@link HttpParams} and then extracts the relevant + * values. + */ + @Override + protected void doConfigure(Map parameters) { + HttpParams params = Util.translate(getFullName(), parameters, HttpParams.class); + ValidationResult result = params.validate(getFullName()); + if (!result.isValid()) { + throw new ParameterValidationRuntimeException("invalid parameters", result); + } + + client = getClientFactory().get(params.getClientName()); + path = params.getPath(); + timeoutSec = params.getTimeoutSec(); + } + + // these may be overridden by junits + + protected HttpClientFactory getClientFactory() { + return HttpClientFactoryInstance.getClientFactory(); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java index 80d8fbd04..df5258d71 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java @@ -20,37 +20,61 @@ package org.onap.policy.controlloop.actorserviceprovider.impl; -import java.time.Instant; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.Function; +import lombok.AccessLevel; import lombok.Getter; +import lombok.Setter; import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.actorserviceprovider.CallbackManager; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; import org.onap.policy.controlloop.actorserviceprovider.Operator; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; -import org.onap.policy.controlloop.policy.Policy; import org.onap.policy.controlloop.policy.PolicyResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Partial implementation of an operator. Subclasses can choose to simply implement - * {@link #doOperation(ControlLoopOperationParams)}, or they may choose to override - * {@link #doOperationAsFuture(ControlLoopOperationParams)}. + * Partial implementation of an operator. In general, it's preferable that subclasses + * would override + * {@link #startOperationAsync(ControlLoopOperationParams, int, OperationOutcome) + * startOperationAsync()}. However, if that proves to be too difficult, then they can + * simply override {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome) + * doOperation()}. In addition, if the operation requires any preprocessor steps, the + * subclass may choose to override + * {@link #startPreprocessorAsync(ControlLoopOperationParams) startPreprocessorAsync()}. + *

+ * The futures returned by the methods within this class can be canceled, and will + * propagate the cancellation to any subtasks. Thus it is also expected that any futures + * returned by overridden methods will do the same. Of course, if a class overrides + * {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome) doOperation()}, + * then there's little that can be done to cancel that particular operation. */ public abstract class OperatorPartial extends StartConfigPartial> implements Operator { private static final Logger logger = LoggerFactory.getLogger(OperatorPartial.class); - private static final String OUTCOME_SUCCESS = PolicyResult.SUCCESS.toString(); - private static final String OUTCOME_FAILURE = PolicyResult.FAILURE.toString(); - private static final String OUTCOME_RETRIES = PolicyResult.FAILURE_RETRIES.toString(); + /** + * Executor to be used for tasks that may perform blocking I/O. The default executor + * simply launches a new thread for each command that is submitted to it. + *

+ * May be overridden by junit tests. + */ + @Getter(AccessLevel.PROTECTED) + @Setter(AccessLevel.PROTECTED) + private Executor blockingExecutor = command -> { + Thread thread = new Thread(command); + thread.setDaemon(true); + thread.start(); + }; @Getter private final String actorName; @@ -103,94 +127,52 @@ public abstract class OperatorPartial extends StartConfigPartial startOperation(ControlLoopOperationParams params) { + public final CompletableFuture startOperation(ControlLoopOperationParams params) { if (!isAlive()) { throw new IllegalStateException("operation is not running: " + getFullName()); } - final Executor executor = params.getExecutor(); - // allocate a controller for the entire operation - final PipelineControllerFuture controller = new PipelineControllerFuture<>(); + final PipelineControllerFuture controller = new PipelineControllerFuture<>(); - CompletableFuture preproc = startPreprocessor(params); + CompletableFuture preproc = startPreprocessorAsync(params); if (preproc == null) { // no preprocessor required - just start the operation return startOperationAttempt(params, controller, 1); } - // propagate "stop" to the preprocessor - controller.add(preproc); - /* * Do preprocessor first and then, if successful, start the operation. Note: * operations create their own outcome, ignoring the outcome from any previous * steps. + * + * Wrap the preprocessor to ensure "stop" is propagated to it. */ - preproc.whenCompleteAsync(controller.delayedRemove(preproc), executor) - .thenComposeAsync(handleFailure(params, controller), executor) - .thenComposeAsync(onSuccess(params, unused -> startOperationAttempt(params, controller, 1)), - executor); - - return controller; - } - - /** - * Starts an operation's preprocessor step(s). If the preprocessor fails, then it - * invokes the started and completed call-backs. - * - * @param params operation parameters - * @return a future that will return the preprocessor outcome, or {@code null} if this - * operation needs no preprocessor - */ - protected CompletableFuture startPreprocessor(ControlLoopOperationParams params) { - logger.info("{}: start low-level operation preprocessor for {}", getFullName(), params.getRequestId()); - - final Executor executor = params.getExecutor(); - final ControlLoopOperation operation = params.makeOutcome(); - - final Function> preproc = - doPreprocessorAsFuture(params); - if (preproc == null) { - // no preprocessor required - return null; - } - - // allocate a controller for the preprocessor steps - final PipelineControllerFuture controller = new PipelineControllerFuture<>(); - - /* - * Don't mark it complete until we've built the whole pipeline. This will prevent - * the operation from starting until after it has been successfully built (i.e., - * without generating any exceptions). - */ - final CompletableFuture firstFuture = new CompletableFuture<>(); - // @formatter:off - firstFuture - .thenComposeAsync(controller.add(preproc), executor) - .exceptionally(fromException(params, operation)) - .whenCompleteAsync(controller.delayedComplete(), executor); + controller.wrap(preproc) + .exceptionally(fromException(params, "preprocessor of operation")) + .thenCompose(handlePreprocessorFailure(params, controller)) + .thenCompose(unusedOutcome -> startOperationAttempt(params, controller, 1)); // @formatter:on - // start the pipeline - firstFuture.complete(operation); - return controller; } /** * Handles a failure in the preprocessor pipeline. If a failure occurred, then it - * invokes the call-backs and returns a failed outcome. Otherwise, it returns the - * outcome that it received. + * invokes the call-backs, marks the controller complete, and returns an incomplete + * future, effectively halting the pipeline. Otherwise, it returns the outcome that it + * received. + *

+ * Assumes that no callbacks have been invoked yet. * * @param params operation parameters * @param controller pipeline controller * @return a function that checks the outcome status and continues, if successful, or * indicates a failure otherwise */ - private Function> handleFailure( - ControlLoopOperationParams params, PipelineControllerFuture controller) { + private Function> handlePreprocessorFailure( + ControlLoopOperationParams params, PipelineControllerFuture controller) { return outcome -> { @@ -207,19 +189,21 @@ public abstract class OperatorPartial extends StartConfigPartial(); }; } @@ -237,8 +221,7 @@ public abstract class OperatorPartial extends StartConfigPartial> doPreprocessorAsFuture( - ControlLoopOperationParams params) { + protected CompletableFuture startPreprocessorAsync(ControlLoopOperationParams params) { return null; } @@ -251,20 +234,12 @@ public abstract class OperatorPartial extends StartConfigPartial startOperationAttempt(ControlLoopOperationParams params, - PipelineControllerFuture controller, int attempt) { - - final Executor executor = params.getExecutor(); - - CompletableFuture future = startAttemptWithoutRetries(params, attempt); + private CompletableFuture startOperationAttempt(ControlLoopOperationParams params, + PipelineControllerFuture controller, int attempt) { // propagate "stop" to the operation attempt - controller.add(future); - - // detach when complete - future.whenCompleteAsync(controller.delayedRemove(future), executor) - .thenComposeAsync(retryOnFailure(params, controller, attempt), params.getExecutor()) - .whenCompleteAsync(controller.delayedComplete(), executor); + controller.wrap(startAttemptWithoutRetries(params, attempt)) + .thenCompose(retryOnFailure(params, controller, attempt)); return controller; } @@ -276,40 +251,32 @@ public abstract class OperatorPartial extends StartConfigPartial startAttemptWithoutRetries(ControlLoopOperationParams params, + private CompletableFuture startAttemptWithoutRetries(ControlLoopOperationParams params, int attempt) { logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId()); final Executor executor = params.getExecutor(); - final ControlLoopOperation outcome = params.makeOutcome(); + final OperationOutcome outcome = params.makeOutcome(); final CallbackManager callbacks = new CallbackManager(); // this operation attempt gets its own controller - final PipelineControllerFuture controller = new PipelineControllerFuture<>(); + final PipelineControllerFuture controller = new PipelineControllerFuture<>(); // propagate "stop" to the callbacks controller.add(callbacks); - /* - * Don't mark it complete until we've built the whole pipeline. This will prevent - * the operation from starting until after it has been successfully built (i.e., - * without generating any exceptions). - */ - final CompletableFuture firstFuture = new CompletableFuture<>(); - // @formatter:off - CompletableFuture future2 = - firstFuture.thenComposeAsync(verifyRunning(controller, params), executor) - .thenApplyAsync(callbackStarted(params, callbacks), executor) - .thenComposeAsync(controller.add(doOperationAsFuture(params, attempt)), executor); + CompletableFuture future = CompletableFuture.completedFuture(outcome) + .whenCompleteAsync(callbackStarted(params, callbacks), executor) + .thenCompose(controller.wrap(outcome2 -> startOperationAsync(params, attempt, outcome2))); // @formatter:on // handle timeouts, if specified - long timeoutMillis = getTimeOutMillis(params.getPolicy()); + long timeoutMillis = getTimeOutMillis(params.getTimeoutSec()); if (timeoutMillis > 0) { logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId()); - future2 = future2.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS); + future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS); } /* @@ -321,16 +288,13 @@ public abstract class OperatorPartial extends StartConfigPartialand was associated with this operator, {@code false} otherwise */ - protected boolean isActorFailed(ControlLoopOperation outcome) { - return OUTCOME_FAILURE.equals(getActorOutcome(outcome)); + protected boolean isActorFailed(OperationOutcome outcome) { + return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE); + } + + /** + * Determines if the given outcome is for this operation. + * + * @param outcome outcome to examine + * @return {@code true} if the outcome is for this operation, {@code false} otherwise + */ + protected boolean isSameOperation(OperationOutcome outcome) { + return OperationOutcome.isFor(outcome, getActorName(), getName()); } /** * Invokes the operation as a "future". This method simply invokes - * {@link #doOperation(ControlLoopOperationParams)} turning it into a "future". + * {@link #doOperation(ControlLoopOperationParams)} using the {@link #blockingExecutor + * "blocking executor"}, returning the result via a "future". + *

+ * Note: if the operation uses blocking I/O, then it should not be run using + * the executor in the "params", as that may bring the background thread pool to a + * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used + * instead. *

* This method assumes the following: *

    @@ -373,31 +353,23 @@ public abstract class OperatorPartial extends StartConfigPartial> doOperationAsFuture( - ControlLoopOperationParams params, int attempt) { - - /* - * TODO As doOperation() may perform blocking I/O, this should be launched in its - * own thread to prevent the ForkJoinPool from being tied up. Should probably - * provide a method to make that easy. - */ + protected CompletableFuture startOperationAsync(ControlLoopOperationParams params, int attempt, + OperationOutcome outcome) { - return operation -> CompletableFuture.supplyAsync(() -> doOperation(params, attempt, operation), - params.getExecutor()); + return CompletableFuture.supplyAsync(() -> doOperation(params, attempt, outcome), getBlockingExecutor()); } /** * Low-level method that performs the operation. This can make the same assumptions * that are made by {@link #doOperationAsFuture(ControlLoopOperationParams)}. This - * method throws an {@link UnsupportedOperationException}. + * particular method simply throws an {@link UnsupportedOperationException}. * * @param params operation parameters * @param attempt attempt number, typically starting with 1 * @param operation the operation being performed * @return the outcome of the operation */ - protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt, - ControlLoopOperation operation) { + protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt, OperationOutcome operation) { throw new UnsupportedOperationException("start operation " + getFullName()); } @@ -411,8 +383,7 @@ public abstract class OperatorPartial extends StartConfigPartial setRetryFlag(ControlLoopOperationParams params, - int attempt) { + private Function setRetryFlag(ControlLoopOperationParams params, int attempt) { return operation -> { if (operation != null && !isActorFailed(operation)) { @@ -424,22 +395,22 @@ public abstract class OperatorPartial extends StartConfigPartial 0 - && attempt > params.getPolicy().getRetry()) { + Integer retry = params.getRetry(); + if (retry != null && retry > 0 && attempt > retry) { /* * retries were specified and we've already tried them all - change to * FAILURE_RETRIES */ logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId()); - oper2.setOutcome(OUTCOME_RETRIES); + oper2.setResult(PolicyResult.FAILURE_RETRIES); } return oper2; @@ -456,21 +427,24 @@ public abstract class OperatorPartial extends StartConfigPartial> retryOnFailure( - ControlLoopOperationParams params, PipelineControllerFuture controller, + private Function> retryOnFailure( + ControlLoopOperationParams params, PipelineControllerFuture controller, int attempt) { return operation -> { if (!isActorFailed(operation)) { // wrong type or wrong operation - just leave it as is logger.trace("not retrying operation {} for {}", getFullName(), params.getRequestId()); - return CompletableFuture.completedFuture(operation); + controller.complete(operation); + return new CompletableFuture<>(); } - if (params.getPolicy().getRetry() == null || params.getPolicy().getRetry() <= 0) { + Integer retry = params.getRetry(); + if (retry == null || retry <= 0) { // no retries - already marked as FAILURE, so just return it logger.info("operation {} no retries for {}", getFullName(), params.getRequestId()); - return CompletableFuture.completedFuture(operation); + controller.complete(operation); + return new CompletableFuture<>(); } @@ -484,100 +458,279 @@ public abstract class OperatorPartial extends StartConfigPartial fromException(ControlLoopOperationParams params, String type) { - if (!getActorName().equals(operation.getActor())) { - return null; - } + return thrown -> { + OperationOutcome outcome = params.makeOutcome(); + + logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(), + params.getRequestId(), thrown); + + return setOutcome(params, outcome, thrown); + }; + } + + /** + * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels + * any outstanding futures when one completes. + * + * @param params operation parameters + * @param futures futures for which to wait + * @return a future to cancel or await an outcome. If this future is canceled, then + * all of the futures will be canceled + */ + protected CompletableFuture anyOf(ControlLoopOperationParams params, + List> futures) { + + // convert list to an array + @SuppressWarnings("rawtypes") + CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]); + + @SuppressWarnings("unchecked") + CompletableFuture result = anyOf(params, arrFutures); + return result; + } + + /** + * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any + * outstanding futures when one completes. + * + * @param params operation parameters + * @param futures futures for which to wait + * @return a future to cancel or await an outcome. If this future is canceled, then + * all of the futures will be canceled + */ + protected CompletableFuture anyOf(ControlLoopOperationParams params, + @SuppressWarnings("unchecked") CompletableFuture... futures) { + + final Executor executor = params.getExecutor(); + final PipelineControllerFuture controller = new PipelineControllerFuture<>(); + + attachFutures(controller, futures); + + // @formatter:off + CompletableFuture.anyOf(futures) + .thenApply(object -> (OperationOutcome) object) + .whenCompleteAsync(controller.delayedComplete(), executor); + // @formatter:on + + return controller; + } + + /** + * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels + * the futures if returned future is canceled. The future returns the "worst" outcome, + * based on priority (see {@link #detmPriority(OperationOutcome)}). + * + * @param params operation parameters + * @param futures futures for which to wait + * @return a future to cancel or await an outcome. If this future is canceled, then + * all of the futures will be canceled + */ + protected CompletableFuture allOf(ControlLoopOperationParams params, + List> futures) { - if (!getName().equals(operation.getOperation())) { - return null; + // convert list to an array + @SuppressWarnings("rawtypes") + CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]); + + @SuppressWarnings("unchecked") + CompletableFuture result = allOf(params, arrFutures); + return result; + } + + /** + * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the + * futures if returned future is canceled. The future returns the "worst" outcome, + * based on priority (see {@link #detmPriority(OperationOutcome)}). + * + * @param params operation parameters + * @param futures futures for which to wait + * @return a future to cancel or await an outcome. If this future is canceled, then + * all of the futures will be canceled + */ + protected CompletableFuture allOf(ControlLoopOperationParams params, + @SuppressWarnings("unchecked") CompletableFuture... futures) { + + final PipelineControllerFuture controller = new PipelineControllerFuture<>(); + + attachFutures(controller, futures); + + OperationOutcome[] outcomes = new OperationOutcome[futures.length]; + + @SuppressWarnings("rawtypes") + CompletableFuture[] futures2 = new CompletableFuture[futures.length]; + + // record the outcomes of each future when it completes + for (int count = 0; count < futures2.length; ++count) { + final int count2 = count; + futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2); } - return operation.getOutcome(); + CompletableFuture.allOf(futures2).whenComplete(combineOutcomes(params, controller, outcomes)); + + return controller; } /** - * Gets a function that will start the next step, if the current operation was - * successful, or just return the current operation, otherwise. + * Attaches the given futures to the controller. + * + * @param controller master controller for all of the futures + * @param futures futures to be attached to the controller + */ + private void attachFutures(PipelineControllerFuture controller, + @SuppressWarnings("unchecked") CompletableFuture... futures) { + + // attach each task + for (CompletableFuture future : futures) { + controller.add(future); + } + } + + /** + * Combines the outcomes from a set of tasks. * * @param params operation parameters - * @param nextStep function that will invoke the next step, passing it the operation - * @return a function that will start the next step + * @param future future to be completed with the combined result + * @param outcomes outcomes to be examined */ - protected Function> onSuccess( - ControlLoopOperationParams params, - Function> nextStep) { + private BiConsumer combineOutcomes(ControlLoopOperationParams params, + CompletableFuture future, OperationOutcome[] outcomes) { - return operation -> { + return (unused, thrown) -> { + if (thrown != null) { + future.completeExceptionally(thrown); + return; + } - if (operation == null) { - logger.trace("{}: null outcome - discarding next task for {}", getFullName(), params.getRequestId()); - ControlLoopOperation outcome = params.makeOutcome(); - outcome.setOutcome(OUTCOME_FAILURE); - return CompletableFuture.completedFuture(outcome); + // identify the outcome with the highest priority + OperationOutcome outcome = outcomes[0]; + int priority = detmPriority(outcome); - } else if (isSuccess(operation)) { - logger.trace("{}: success - starting next task for {}", getFullName(), params.getRequestId()); - return nextStep.apply(operation); + // start with "1", as we've already dealt with "0" + for (int count = 1; count < outcomes.length; ++count) { + OperationOutcome outcome2 = outcomes[count]; + int priority2 = detmPriority(outcome2); - } else { - logger.trace("{}: failure - discarding next task for {}", getFullName(), params.getRequestId()); - return CompletableFuture.completedFuture(operation); + if (priority2 > priority) { + outcome = outcome2; + priority = priority2; + } } + + logger.trace("{}: combined outcome of tasks is {} for {}", getFullName(), + (outcome == null ? null : outcome.getResult()), params.getRequestId()); + + future.complete(outcome); }; } /** - * Converts an exception into an operation outcome, returning a copy of the outcome to - * prevent background jobs from changing it. + * Determines the priority of an outcome based on its result. * - * @param params operation parameters - * @param operation current operation - * @return a function that will convert an exception into an operation outcome + * @param outcome outcome to examine, or {@code null} + * @return the outcome's priority */ - private Function fromException(ControlLoopOperationParams params, - ControlLoopOperation operation) { + protected int detmPriority(OperationOutcome outcome) { + if (outcome == null) { + return 1; + } - return thrown -> { - logger.warn("exception throw by operation {}.{} for {}", operation.getActor(), operation.getOperation(), - params.getRequestId(), thrown); + switch (outcome.getResult()) { + case SUCCESS: + return 0; + + case FAILURE_GUARD: + return 2; + + case FAILURE_RETRIES: + return 3; + + case FAILURE: + return 4; + + case FAILURE_TIMEOUT: + return 5; + + case FAILURE_EXCEPTION: + default: + return 6; + } + } + + /** + * Performs a task, after verifying that the controller is still running. Also checks + * that the previous outcome was successful, if specified. + * + * @param params operation parameters + * @param controller overall pipeline controller + * @param checkSuccess {@code true} to check the previous outcome, {@code false} + * otherwise + * @param outcome outcome of the previous task + * @param tasks tasks to be performed + * @return a function to perform the task. If everything checks out, then it returns + * the task's future. Otherwise, it returns an incomplete future and completes + * the controller instead. + */ + // @formatter:off + protected CompletableFuture doTask(ControlLoopOperationParams params, + PipelineControllerFuture controller, + boolean checkSuccess, OperationOutcome outcome, + CompletableFuture task) { + // @formatter:on + if (checkSuccess && !isSuccess(outcome)) { /* - * Must make a copy of the operation, as the original could be changed by - * background jobs that might still be running. + * must complete before canceling so that cancel() doesn't cause controller to + * complete */ - return setOutcome(params, new ControlLoopOperation(operation), thrown); - }; + controller.complete(outcome); + task.cancel(false); + return new CompletableFuture<>(); + } + + return controller.wrap(task); } /** - * Gets a function to verify that the operation is still running. If the pipeline is - * not running, then it returns an incomplete future, which will effectively halt - * subsequent operations in the pipeline. This method is intended to be used with one - * of the {@link CompletableFuture}'s thenCompose() methods. + * Performs a task, after verifying that the controller is still running. Also checks + * that the previous outcome was successful, if specified. * - * @param controller pipeline controller * @param params operation parameters - * @return a function to verify that the operation is still running + * @param controller overall pipeline controller + * @param checkSuccess {@code true} to check the previous outcome, {@code false} + * otherwise + * @param tasks tasks to be performed + * @return a function to perform the task. If everything checks out, then it returns + * the task's future. Otherwise, it returns an incomplete future and completes + * the controller instead. */ - protected Function> verifyRunning( - PipelineControllerFuture controller, ControlLoopOperationParams params) { + // @formatter:off + protected Function> doTask(ControlLoopOperationParams params, + PipelineControllerFuture controller, + boolean checkSuccess, + Function> task) { + // @formatter:on + + return outcome -> { + + if (!controller.isRunning()) { + return new CompletableFuture<>(); + } - return value -> { - boolean running = controller.isRunning(); - logger.trace("{}: verify running {} for {}", getFullName(), running, params.getRequestId()); + if (checkSuccess && !isSuccess(outcome)) { + controller.complete(outcome); + return new CompletableFuture<>(); + } - return (running ? CompletableFuture.completedFuture(value) : new CompletableFuture<>()); + return controller.wrap(task.apply(outcome)); }; } @@ -591,10 +744,10 @@ public abstract class OperatorPartial extends StartConfigPartial callbackStarted(ControlLoopOperationParams params, + private BiConsumer callbackStarted(ControlLoopOperationParams params, CallbackManager callbacks) { - return outcome -> { + return (outcome, thrown) -> { if (callbacks.canStart()) { // haven't invoked "start" callback yet @@ -602,8 +755,6 @@ public abstract class OperatorPartial extends StartConfigPartial callbackCompleted(ControlLoopOperationParams params, + private BiConsumer callbackCompleted(ControlLoopOperationParams params, CallbackManager callbacks) { - return operation -> { + return (outcome, thrown) -> { if (callbacks.canEnd()) { - operation.setStart(callbacks.getStartTime()); - operation.setEnd(callbacks.getEndTime()); - params.callbackCompleted(operation); + outcome.setStart(callbacks.getStartTime()); + outcome.setEnd(callbacks.getEndTime()); + params.callbackCompleted(outcome); } - - return operation; }; } @@ -643,7 +792,7 @@ public abstract class OperatorPartial extends StartConfigPartial startTime = new AtomicReference<>(); - private final AtomicReference endTime = new AtomicReference<>(); - - /** - * Determines if the "start" callback can be invoked. If so, it sets the - * {@link #startTime} to the current time. - * - * @return {@code true} if the "start" callback can be invoked, {@code false} - * otherwise - */ - public boolean canStart() { - return startTime.compareAndSet(null, Instant.now()); - } - - /** - * Determines if the "end" callback can be invoked. If so, it sets the - * {@link #endTime} to the current time. - * - * @return {@code true} if the "end" callback can be invoked, {@code false} - * otherwise - */ - public boolean canEnd() { - return endTime.compareAndSet(null, Instant.now()); - } - - /** - * Gets the start time. - * - * @return the start time, or {@code null} if {@link #canStart()} has not been - * invoked yet. - */ - public Instant getStartTime() { - return startTime.get(); - } - - /** - * Gets the end time. - * - * @return the end time, or {@code null} if {@link #canEnd()} has not been invoked - * yet. - */ - public Instant getEndTime() { - return endTime.get(); - } - - /** - * Prevents further callbacks from being executed by setting {@link #startTime} - * and {@link #endTime}. - */ - @Override - public void run() { - canStart(); - canEnd(); - } - } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java index 08aba81f2..57fce40d7 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java @@ -20,6 +20,7 @@ package org.onap.policy.controlloop.actorserviceprovider.parameters; +import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -32,11 +33,11 @@ import lombok.Getter; import org.onap.policy.common.parameters.BeanValidationResult; import org.onap.policy.common.parameters.BeanValidator; import org.onap.policy.common.parameters.annotations.NotNull; -import org.onap.policy.controlloop.ControlLoopOperation; import org.onap.policy.controlloop.actorserviceprovider.ActorService; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; import org.onap.policy.controlloop.actorserviceprovider.Util; import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext; -import org.onap.policy.controlloop.policy.Policy; +import org.onap.policy.controlloop.policy.Target; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,36 +50,67 @@ import org.slf4j.LoggerFactory; @AllArgsConstructor @EqualsAndHashCode public class ControlLoopOperationParams { - private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationParams.class); - public static final String UNKNOWN = "-unknown-"; - + /** + * Actor name. + */ + @NotNull + private String actor; /** - * The actor service in which to find the actor/operation. + * Actor service in which to find the actor/operation. */ @NotNull private ActorService actorService; /** - * The event for which the operation applies. + * Event for which the operation applies. */ @NotNull private ControlLoopEventContext context; /** - * The executor to use to run the operation. + * Executor to use to run the operation. */ @NotNull @Builder.Default private Executor executor = ForkJoinPool.commonPool(); /** - * The policy associated with the operation. + * Operation name. + */ + @NotNull + private String operation; + + /** + * Payload data for the request. + */ + private Map payload; + + /** + * Number of retries allowed, or {@code null} if no retries. + */ + private Integer retry; + + /** + * The entity's target information. May be {@code null}, depending on the requirement + * of the operation to be invoked. + */ + private Target target; + + /** + * Target entity. */ @NotNull - private Policy policy; + private String targetEntity; + + /** + * Timeout, in seconds, or {@code null} if no timeout. Zero and negative values also + * imply no timeout. + */ + @Builder.Default + private Integer timeoutSec = 300; /** * The function to invoke when the operation starts. This is optional. @@ -87,7 +119,7 @@ public class ControlLoopOperationParams { * may happen if the current operation requires other operations to be performed first * (e.g., A&AI queries, guard checks). */ - private Consumer startCallback; + private Consumer startCallback; /** * The function to invoke when the operation completes. This is optional. @@ -96,13 +128,7 @@ public class ControlLoopOperationParams { * may happen if the current operation requires other operations to be performed first * (e.g., A&AI queries, guard checks). */ - private Consumer completeCallback; - - /** - * Target entity. - */ - @NotNull - private String target; + private Consumer completeCallback; /** * Starts the specified operation. @@ -110,7 +136,7 @@ public class ControlLoopOperationParams { * @return a future that will return the result of the operation * @throws IllegalArgumentException if the parameters are invalid */ - public CompletableFuture start() { + public CompletableFuture start() { BeanValidationResult result = validate(); if (!result.isValid()) { logger.warn("parameter error in operation {}.{} for {}:\n{}", getActor(), getOperation(), getRequestId(), @@ -120,30 +146,12 @@ public class ControlLoopOperationParams { // @formatter:off return actorService - .getActor(policy.getActor()) - .getOperator(policy.getRecipe()) + .getActor(getActor()) + .getOperator(getOperation()) .startOperation(this); // @formatter:on } - /** - * Gets the name of the actor from the policy. - * - * @return the actor name, or {@link #UNKNOWN} if no name is available - */ - public String getActor() { - return (policy == null || policy.getActor() == null ? UNKNOWN : policy.getActor()); - } - - /** - * Gets the name of the operation from the policy. - * - * @return the operation name, or {@link #UNKNOWN} if no name is available - */ - public String getOperation() { - return (policy == null || policy.getRecipe() == null ? UNKNOWN : policy.getRecipe()); - } - /** * Gets the requested ID of the associated event. * @@ -158,13 +166,13 @@ public class ControlLoopOperationParams { * * @return a new operation outcome */ - public ControlLoopOperation makeOutcome() { - ControlLoopOperation operation = new ControlLoopOperation(); - operation.setActor(getActor()); - operation.setOperation(getOperation()); - operation.setTarget(target); + public OperationOutcome makeOutcome() { + OperationOutcome outcome = new OperationOutcome(); + outcome.setActor(getActor()); + outcome.setOperation(getOperation()); + outcome.setTarget(targetEntity); - return operation; + return outcome; } /** @@ -173,11 +181,11 @@ public class ControlLoopOperationParams { * * @param operation the operation that is being started */ - public void callbackStarted(ControlLoopOperation operation) { + public void callbackStarted(OperationOutcome operation) { logger.info("started operation {}.{} for {}", operation.getActor(), operation.getOperation(), getRequestId()); if (startCallback != null) { - Util.logException(() -> startCallback.accept(operation), "{}.{}: start-callback threw an exception for {}", + Util.runFunction(() -> startCallback.accept(operation), "{}.{}: start-callback threw an exception for {}", operation.getActor(), operation.getOperation(), getRequestId()); } } @@ -188,12 +196,12 @@ public class ControlLoopOperationParams { * * @param operation the operation that is being started */ - public void callbackCompleted(ControlLoopOperation operation) { + public void callbackCompleted(OperationOutcome operation) { logger.info("completed operation {}.{} outcome={} for {}", operation.getActor(), operation.getOperation(), - operation.getOutcome(), getRequestId()); + operation.getResult(), getRequestId()); if (completeCallback != null) { - Util.logException(() -> completeCallback.accept(operation), + Util.runFunction(() -> completeCallback.accept(operation), "{}.{}: complete-callback threw an exception for {}", operation.getActor(), operation.getOperation(), getRequestId()); } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java index d34a3fb5b..1d64a8710 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java @@ -101,7 +101,7 @@ public class ListenerManager { */ protected void runListener(Runnable listener) { // TODO do this asynchronously? - Util.logException(listener, "pipeline listener {} threw an exception", listener); + Util.runFunction(listener, "pipeline listener {} threw an exception", listener); } /** diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java index 96c8f9e05..92843e28a 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java @@ -23,41 +23,70 @@ package org.onap.policy.controlloop.actorserviceprovider.pipeline; import static org.onap.policy.controlloop.actorserviceprovider.Util.ident; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; import lombok.NoArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Pipeline controller, used by operations within the pipeline to determine if they should - * continue to run. If {@link #cancel(boolean)} is invoked, it automatically stops the - * pipeline. + * continue to run. Whenever this is canceled or completed, it automatically cancels all + * futures and runs all listeners that have been added. */ @NoArgsConstructor public class PipelineControllerFuture extends CompletableFuture { private static final Logger logger = LoggerFactory.getLogger(PipelineControllerFuture.class); + private static final String COMPLETE_EXCEPT_MSG = "{}: complete future with exception"; + private static final String CANCEL_MSG = "{}: cancel future"; + private static final String COMPLETE_MSG = "{}: complete future"; + /** * Tracks items added to this controller via one of the add methods. */ private final FutureManager futures = new FutureManager(); - /** - * Cancels and stops the pipeline, in that order. - */ @Override public boolean cancel(boolean mayInterruptIfRunning) { - try { - logger.trace("{}: cancel future", ident(this)); - return super.cancel(mayInterruptIfRunning); + return doAndStop(() -> super.cancel(mayInterruptIfRunning), CANCEL_MSG, ident(this)); + } - } finally { - futures.stop(); - } + @Override + public boolean complete(T value) { + return doAndStop(() -> super.complete(value), COMPLETE_MSG, ident(this)); + } + + @Override + public boolean completeExceptionally(Throwable ex) { + return doAndStop(() -> super.completeExceptionally(ex), COMPLETE_EXCEPT_MSG, ident(this)); + } + + @Override + public CompletableFuture completeAsync(Supplier supplier, Executor executor) { + return super.completeAsync(() -> doAndStop(supplier, COMPLETE_MSG, ident(this)), executor); + } + + @Override + public CompletableFuture completeAsync(Supplier supplier) { + return super.completeAsync(() -> doAndStop(supplier, COMPLETE_MSG, ident(this))); + } + + @Override + public CompletableFuture completeOnTimeout(T value, long timeout, TimeUnit unit) { + logger.info("{}: set future timeout to {} {}", ident(this), timeout, unit); + return super.completeOnTimeout(value, timeout, unit); + } + + @Override + public PipelineControllerFuture newIncompleteFuture() { + return new PipelineControllerFuture<>(); } /** @@ -67,11 +96,8 @@ public class PipelineControllerFuture extends CompletableFuture { * * @return a function that removes the given future */ - public BiConsumer delayedRemove(Future future) { - return (value, thrown) -> { - logger.trace("{}: remove future {}", ident(this), ident(future)); - remove(future); - }; + public BiConsumer delayedRemove(Future future) { + return (value, thrown) -> remove(future); } /** @@ -81,11 +107,8 @@ public class PipelineControllerFuture extends CompletableFuture { * * @return a function that removes the given listener */ - public BiConsumer delayedRemove(Runnable listener) { - return (value, thrown) -> { - logger.trace("{}: remove listener {}", ident(this), ident(listener)); - remove(listener); - }; + public BiConsumer delayedRemove(Runnable listener) { + return (value, thrown) -> remove(listener); } /** @@ -98,25 +121,43 @@ public class PipelineControllerFuture extends CompletableFuture { public BiConsumer delayedComplete() { return (value, thrown) -> { if (thrown == null) { - logger.trace("{}: complete and stop future", ident(this)); complete(value); } else { - logger.trace("{}: complete exceptionally and stop future", ident(this)); completeExceptionally(thrown); } - - futures.stop(); }; } + /** + * Adds a future to the controller and arranges for it to be removed from the + * controller when it completes, whether or not it throws an exception. If the + * controller has already been stopped, then the future is canceled and a new, + * incomplete future is returned. + * + * @param future future to be wrapped + * @return a new future + */ + public CompletableFuture wrap(CompletableFuture future) { + if (!isRunning()) { + logger.trace("{}: not running, skipping next task {}", ident(this), ident(future)); + future.cancel(false); + return new CompletableFuture<>(); + } + + add(future); + return future.whenComplete(this.delayedRemove(future)); + } + /** * Adds a function whose return value is to be canceled when this controller is * stopped. Note: if the controller is already stopped, then the function will * not be executed. * - * @param futureMaker function to be invoked in the future + * @param futureMaker function to be invoked to create the future + * @return a function to create the future and arrange for it to be managed by this + * controller */ - public Function> add(Function> futureMaker) { + public Function> wrap(Function> futureMaker) { return input -> { if (!isRunning()) { @@ -127,7 +168,7 @@ public class PipelineControllerFuture extends CompletableFuture { CompletableFuture future = futureMaker.apply(input); add(future); - return future; + return future.whenComplete(delayedRemove(future)); }; } @@ -154,4 +195,26 @@ public class PipelineControllerFuture extends CompletableFuture { logger.trace("{}: remove listener {}", ident(this), ident(listener)); futures.remove(listener); } + + /** + * Performs an operation, stops the futures, and returns the value from the operation. + * Logs a message using the given arguments. + * + * + * @param type of value to be returned + * @param supplier operation to perform + * @param message message to be logged + * @param args message arguments to fill "{}" place-holders + * @return the operation's result + */ + private R doAndStop(Supplier supplier, String message, Object... args) { + try { + logger.trace(message, args); + return supplier.get(); + + } finally { + logger.trace("{}: stopping this future", ident(this)); + futures.stop(); + } + } } -- cgit 1.2.3-korg