diff options
Diffstat (limited to 'models-interactions/model-actors/actorServiceProvider/src/main')
14 files changed, 1100 insertions, 394 deletions
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java index 13f09b1ad..2886b1feb 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java @@ -63,7 +63,6 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> { return newActor; } - // TODO: should this throw an exception? logger.warn("duplicate actor names for {}: {}, ignoring {}", name, existingActor.getClass().getSimpleName(), newActor.getClass().getSimpleName()); return existingActor; @@ -158,7 +157,7 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> { for (Actor actor : name2actor.values()) { if (actor.isConfigured()) { - Util.logException(actor::start, "failed to start actor {}", actor.getName()); + Util.runFunction(actor::start, "failed to start actor {}", actor.getName()); } else { logger.warn("not starting unconfigured actor {}", actor.getName()); @@ -170,7 +169,7 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> { protected void doStop() { logger.info("stopping actors"); name2actor.values() - .forEach(actor -> Util.logException(actor::stop, "failed to stop actor {}", actor.getName())); + .forEach(actor -> Util.runFunction(actor::stop, "failed to stop actor {}", actor.getName())); } @Override @@ -179,7 +178,7 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> { // @formatter:off name2actor.values().forEach( - actor -> Util.logException(actor::shutdown, "failed to shutdown actor {}", actor.getName())); + actor -> Util.runFunction(actor::shutdown, "failed to shutdown actor {}", actor.getName())); // @formatter:on } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java new file mode 100644 index 000000000..d78403809 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java @@ -0,0 +1,119 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import javax.ws.rs.client.InvocationCallback; +import lombok.AccessLevel; +import lombok.Getter; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handler for a <i>single</i> asynchronous response. + * + * @param <T> response type + */ +@Getter +public abstract class AsyncResponseHandler<T> implements InvocationCallback<T> { + + private static final Logger logger = LoggerFactory.getLogger(AsyncResponseHandler.class); + + @Getter(AccessLevel.NONE) + private final PipelineControllerFuture<OperationOutcome> result = new PipelineControllerFuture<>(); + private final ControlLoopOperationParams params; + private final OperationOutcome outcome; + + /** + * Constructs the object. + * + * @param params operation parameters + * @param outcome outcome to be populated based on the response + */ + public AsyncResponseHandler(ControlLoopOperationParams params, OperationOutcome outcome) { + this.params = params; + this.outcome = outcome; + } + + /** + * Handles the given future, arranging to cancel it when the response is received. + * + * @param future future to be handled + * @return a future to be used to cancel or wait for the response + */ + public CompletableFuture<OperationOutcome> handle(Future<T> future) { + result.add(future); + return result; + } + + /** + * Invokes {@link #doComplete()} and then completes "this" with the returned value. + */ + @Override + public void completed(T rawResponse) { + try { + logger.trace("{}.{}: response completed for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + result.complete(doComplete(rawResponse)); + + } catch (RuntimeException e) { + logger.trace("{}.{}: response handler threw an exception for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + result.completeExceptionally(e); + } + } + + /** + * Invokes {@link #doFailed()} and then completes "this" with the returned value. + */ + @Override + public void failed(Throwable throwable) { + try { + logger.trace("{}.{}: response failure for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + result.complete(doFailed(throwable)); + + } catch (RuntimeException e) { + logger.trace("{}.{}: response failure handler threw an exception for {}", params.getActor(), + params.getOperation(), params.getRequestId()); + result.completeExceptionally(e); + } + } + + /** + * Completes the processing of a response. + * + * @param rawResponse raw response that was received + * @return the outcome + */ + protected abstract OperationOutcome doComplete(T rawResponse); + + /** + * Handles a response exception. + * + * @param thrown exception that was thrown + * @return the outcome + */ + protected abstract OperationOutcome doFailed(Throwable thrown); +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManager.java new file mode 100644 index 000000000..7d7c1d902 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManager.java @@ -0,0 +1,84 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider; + +import java.time.Instant; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Manager for "start" and "end" callbacks. + */ +public class CallbackManager implements Runnable { + private final AtomicReference<Instant> startTime = new AtomicReference<>(); + private final AtomicReference<Instant> endTime = new AtomicReference<>(); + + /** + * Determines if the "start" callback can be invoked. If so, it sets the + * {@link #startTime} to the current time. + * + * @return {@code true} if the "start" callback can be invoked, {@code false} + * otherwise + */ + public boolean canStart() { + return startTime.compareAndSet(null, Instant.now()); + } + + /** + * Determines if the "end" callback can be invoked. If so, it sets the + * {@link #endTime} to the current time. + * + * @return {@code true} if the "end" callback can be invoked, {@code false} + * otherwise + */ + public boolean canEnd() { + return endTime.compareAndSet(null, Instant.now()); + } + + /** + * Gets the start time. + * + * @return the start time, or {@code null} if {@link #canStart()} has not been + * invoked yet. + */ + public Instant getStartTime() { + return startTime.get(); + } + + /** + * Gets the end time. + * + * @return the end time, or {@code null} if {@link #canEnd()} has not been invoked + * yet. + */ + public Instant getEndTime() { + return endTime.get(); + } + + /** + * Prevents further callbacks from being executed by setting {@link #startTime} + * and {@link #endTime}. + */ + @Override + public void run() { + canStart(); + canEnd(); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcome.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcome.java new file mode 100644 index 000000000..6b0924807 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcome.java @@ -0,0 +1,116 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider; + +import java.time.Instant; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.policy.PolicyResult; + +/** + * Outcome from an operation. Objects of this type are passed from one stage to the next. + */ +@Data +@NoArgsConstructor +public class OperationOutcome { + private String actor; + private String operation; + private String target; + private Instant start; + private Instant end; + private String subRequestId; + private PolicyResult result = PolicyResult.SUCCESS; + private String message; + + /** + * Copy constructor. + * + * @param source source object from which to copy + */ + public OperationOutcome(OperationOutcome source) { + this.actor = source.actor; + this.operation = source.operation; + this.target = source.target; + this.start = source.start; + this.end = source.end; + this.subRequestId = source.subRequestId; + this.result = source.result; + this.message = source.message; + } + + /** + * Creates a {@link ControlLoopOperation}, populating all fields with the values from + * this object. Sets the outcome field to the string representation of this object's + * outcome. + * + * @return + */ + public ControlLoopOperation toControlLoopOperation() { + ControlLoopOperation clo = new ControlLoopOperation(); + + clo.setActor(actor); + clo.setOperation(operation); + clo.setTarget(target); + clo.setStart(start); + clo.setEnd(end); + clo.setSubRequestId(subRequestId); + clo.setOutcome(result.toString()); + clo.setMessage(message); + + return clo; + } + + /** + * Determines if this outcome is for the given actor and operation. + * + * @param actor actor name + * @param operation operation name + * @return {@code true} if this outcome is for the given actor and operation + */ + public boolean isFor(@NonNull String actor, @NonNull String operation) { + // do the operation check first, as it's most likely to be unique + return (operation.equals(this.operation) && actor.equals(this.actor)); + } + + /** + * Determines if an outcome is for the given actor and operation. + * + * @param outcome outcome to be examined, or {@code null} + * @param actor actor name + * @param operation operation name + * @return {@code true} if this outcome is for the given actor and operation, + * {@code false} it is {@code null} or not for the actor/operation + */ + public static boolean isFor(OperationOutcome outcome, String actor, String operation) { + return (outcome != null && outcome.isFor(actor, operation)); + } + + /** + * Sets the result. + * + * @param result new result + */ + public void setResult(@NonNull PolicyResult result) { + this.result = result; + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java index e308ee42e..c09460e34 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import org.onap.policy.common.capabilities.Configurable; import org.onap.policy.common.capabilities.Startable; -import org.onap.policy.controlloop.ControlLoopOperation; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; /** @@ -54,5 +53,5 @@ public interface Operator extends Startable, Configurable<Map<String, Object>> { * @param params parameters needed to start the operation * @return a future that can be used to cancel or await the result of the operation */ - CompletableFuture<ControlLoopOperation> startOperation(ControlLoopOperationParams params); + CompletableFuture<OperationOutcome> startOperation(ControlLoopOperationParams params); } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java index 0aba1a7fa..c3ddd17f3 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java @@ -23,6 +23,9 @@ package org.onap.policy.controlloop.actorserviceprovider; import java.util.Arrays; import java.util.LinkedHashMap; import java.util.Map; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; import org.onap.policy.common.utils.coder.Coder; import org.onap.policy.common.utils.coder.CoderException; import org.onap.policy.common.utils.coder.StandardCoder; @@ -53,6 +56,82 @@ public class Util { } /** + * Logs a REST request. If the request is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param url request URL + * @param request request to be logged + */ + public static <T> void logRestRequest(String url, T request) { + logRestRequest(new StandardCoder(), url, request); + } + + /** + * Logs a REST request. If the request is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param coder coder to be used to pretty-print the request + * @param url request URL + * @param request request to be logged + */ + protected static <T> void logRestRequest(Coder coder, String url, T request) { + String json; + try { + if (request instanceof String) { + json = request.toString(); + } else { + json = coder.encode(request, true); + } + + } catch (CoderException e) { + logger.warn("cannot pretty-print request", e); + json = request.toString(); + } + + NetLoggerUtil.log(EventType.OUT, CommInfrastructure.REST, url, json); + logger.info("[OUT|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json); + } + + /** + * Logs a REST response. If the response is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param url request URL + * @param response response to be logged + */ + public static <T> void logRestResponse(String url, T response) { + logRestResponse(new StandardCoder(), url, response); + } + + /** + * Logs a REST response. If the request is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param coder coder to be used to pretty-print the response + * @param url request URL + * @param response response to be logged + */ + protected static <T> void logRestResponse(Coder coder, String url, T response) { + String json; + try { + if (response == null) { + json = null; + } else if (response instanceof String) { + json = response.toString(); + } else { + json = coder.encode(response, true); + } + + } catch (CoderException e) { + logger.warn("cannot pretty-print response", e); + json = response.toString(); + } + + NetLoggerUtil.log(EventType.IN, CommInfrastructure.REST, url, json); + logger.info("[IN|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json); + } + + /** * Runs a function and logs a message if it throws an exception. Does <i>not</i> * re-throw the exception. * @@ -60,7 +139,7 @@ public class Util { * @param exceptionMessage message to log if an exception is thrown * @param exceptionArgs arguments to be passed to the logger */ - public static void logException(Runnable function, String exceptionMessage, Object... exceptionArgs) { + public static void runFunction(Runnable function, String exceptionMessage, Object... exceptionArgs) { try { function.run(); diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java index 68bbe7edc..cd4d2570f 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java @@ -22,11 +22,12 @@ package org.onap.policy.controlloop.actorserviceprovider.controlloop; import java.io.Serializable; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import lombok.AccessLevel; import lombok.Getter; +import lombok.NonNull; import lombok.Setter; -import org.onap.policy.aai.AaiCqResponse; import org.onap.policy.controlloop.VirtualControlLoopEvent; /** @@ -37,23 +38,35 @@ import org.onap.policy.controlloop.VirtualControlLoopEvent; public class ControlLoopEventContext implements Serializable { private static final long serialVersionUID = 1L; - @Setter(AccessLevel.NONE) + private final VirtualControlLoopEvent event; - private AaiCqResponse aaiCqResponse; + /** + * Enrichment data extracted from the event. Never {@code null}, though it may be + * immutable. + */ + private final Map<String, String> enrichment; - // TODO may remove this if it proves not to be needed @Getter(AccessLevel.NONE) @Setter(AccessLevel.NONE) private Map<String, Serializable> properties = new ConcurrentHashMap<>(); /** + * Request ID extracted from the event, or a generated value if the event has no + * request id; never {@code null}. + */ + private final UUID requestId; + + + /** * Constructs the object. * * @param event event with which this is associated */ - public ControlLoopEventContext(VirtualControlLoopEvent event) { + public ControlLoopEventContext(@NonNull VirtualControlLoopEvent event) { this.event = event; + this.requestId = (event.getRequestId() != null ? event.getRequestId() : UUID.randomUUID()); + this.enrichment = (event.getAai() != null ? event.getAai() : Map.of()); } /** diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java index 9b9aa914e..d7f322e8a 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java @@ -20,14 +20,12 @@ package org.onap.policy.controlloop.actorserviceprovider.impl; -import com.google.common.collect.ImmutableMap; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import org.onap.policy.common.parameters.BeanValidationResult; import org.onap.policy.controlloop.actorserviceprovider.Operator; @@ -46,44 +44,42 @@ public class ActorImpl extends StartConfigPartial<Map<String, Object>> implement /** * Maps a name to an operator. */ - private Map<String, Operator> name2operator; + private final Map<String, Operator> name2operator = new ConcurrentHashMap<>(); /** * Constructs the object. * * @param name actor name - * @param operators the operations supported by this actor */ - public ActorImpl(String name, Operator... operators) { + public ActorImpl(String name) { super(name); - setOperators(Arrays.asList(operators)); } /** - * Sets the operators supported by this actor, overriding any previous list. + * Adds an operator supported by this actor. * - * @param operators the operations supported by this actor + * @param operator operation to be added */ - protected void setOperators(List<Operator> operators) { + protected synchronized void addOperator(Operator operator) { + /* + * This method is "synchronized" to prevent the state from changing while the + * operator is added. The map, itself, does not need synchronization as it's a + * concurrent map. + */ + if (isConfigured()) { throw new IllegalStateException("attempt to set operators on a configured actor: " + getName()); } - Map<String, Operator> map = new HashMap<>(); - for (Operator newOp : operators) { - map.compute(newOp.getName(), (opName, existingOp) -> { - if (existingOp == null) { - return newOp; - } - - // TODO: should this throw an exception? - logger.warn("duplicate names for actor operation {}.{}: {}, ignoring {}", getName(), opName, - existingOp.getClass().getSimpleName(), newOp.getClass().getSimpleName()); - return existingOp; - }); - } + name2operator.compute(operator.getName(), (opName, existingOp) -> { + if (existingOp == null) { + return operator; + } - this.name2operator = ImmutableMap.copyOf(map); + logger.warn("duplicate names for actor operation {}.{}: {}, ignoring {}", getName(), opName, + existingOp.getClass().getSimpleName(), operator.getClass().getSimpleName()); + return existingOp; + }); } @Override @@ -177,7 +173,7 @@ public class ActorImpl extends StartConfigPartial<Map<String, Object>> implement for (Operator oper : name2operator.values()) { if (oper.isConfigured()) { - Util.logException(oper::start, "failed to start operation {}.{}", actorName, oper.getName()); + Util.runFunction(oper::start, "failed to start operation {}.{}", actorName, oper.getName()); } else { logger.warn("not starting unconfigured operation {}.{}", actorName, oper.getName()); @@ -195,7 +191,7 @@ public class ActorImpl extends StartConfigPartial<Map<String, Object>> implement // @formatter:off name2operator.values().forEach( - oper -> Util.logException(oper::stop, "failed to stop operation {}.{}", actorName, oper.getName())); + oper -> Util.runFunction(oper::stop, "failed to stop operation {}.{}", actorName, oper.getName())); // @formatter:on } @@ -208,7 +204,7 @@ public class ActorImpl extends StartConfigPartial<Map<String, Object>> implement logger.info("shutting down operations for actor {}", actorName); // @formatter:off - name2operator.values().forEach(oper -> Util.logException(oper::shutdown, + name2operator.values().forEach(oper -> Util.runFunction(oper::shutdown, "failed to shutdown operation {}.{}", actorName, oper.getName())); // @formatter:on } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActor.java new file mode 100644 index 000000000..28b7b3924 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActor.java @@ -0,0 +1,58 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.util.Map; +import java.util.function.Function; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpActorParams; + +/** + * Actor that uses HTTP, where the only additional property that an operator needs is a + * URL. The actor's parameters must be an {@link HttpActorParams} and its operator + * parameters are expected to be an {@link HttpParams}. + */ +public class HttpActor extends ActorImpl { + + /** + * Constructs the object. + * + * @param name actor's name + */ + public HttpActor(String name) { + super(name); + } + + /** + * Translates the parameters to an {@link HttpActorParams} and then creates a function + * that will extract operator-specific parameters. + */ + @Override + protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) { + String actorName = getName(); + + // @formatter:off + return Util.translate(actorName, actorParameters, HttpActorParams.class) + .doValidation(actorName) + .makeOperationParameters(actorName); + // @formatter:on + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java new file mode 100644 index 000000000..566492907 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java @@ -0,0 +1,84 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.util.Map; +import lombok.AccessLevel; +import lombok.Getter; +import org.onap.policy.common.endpoints.http.client.HttpClient; +import org.onap.policy.common.endpoints.http.client.HttpClientFactory; +import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; + +/** + * Operator that uses HTTP. The operator's parameters must be a {@link HttpParams}. + */ +public class HttpOperator extends OperatorPartial { + + @Getter(AccessLevel.PROTECTED) + private HttpClient client; + + @Getter + private long timeoutSec; + + /** + * URI path for this particular operation. + */ + @Getter + private String path; + + + /** + * Constructs the object. + * + * @param actorName name of the actor with which this operator is associated + * @param name operation name + */ + public HttpOperator(String actorName, String name) { + super(actorName, name); + } + + /** + * Translates the parameters to an {@link HttpParams} and then extracts the relevant + * values. + */ + @Override + protected void doConfigure(Map<String, Object> parameters) { + HttpParams params = Util.translate(getFullName(), parameters, HttpParams.class); + ValidationResult result = params.validate(getFullName()); + if (!result.isValid()) { + throw new ParameterValidationRuntimeException("invalid parameters", result); + } + + client = getClientFactory().get(params.getClientName()); + path = params.getPath(); + timeoutSec = params.getTimeoutSec(); + } + + // these may be overridden by junits + + protected HttpClientFactory getClientFactory() { + return HttpClientFactoryInstance.getClientFactory(); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java index 80d8fbd04..df5258d71 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java @@ -20,37 +20,61 @@ package org.onap.policy.controlloop.actorserviceprovider.impl; -import java.time.Instant; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.Function; +import lombok.AccessLevel; import lombok.Getter; +import lombok.Setter; import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.actorserviceprovider.CallbackManager; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; import org.onap.policy.controlloop.actorserviceprovider.Operator; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; -import org.onap.policy.controlloop.policy.Policy; import org.onap.policy.controlloop.policy.PolicyResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Partial implementation of an operator. Subclasses can choose to simply implement - * {@link #doOperation(ControlLoopOperationParams)}, or they may choose to override - * {@link #doOperationAsFuture(ControlLoopOperationParams)}. + * Partial implementation of an operator. In general, it's preferable that subclasses + * would override + * {@link #startOperationAsync(ControlLoopOperationParams, int, OperationOutcome) + * startOperationAsync()}. However, if that proves to be too difficult, then they can + * simply override {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome) + * doOperation()}. In addition, if the operation requires any preprocessor steps, the + * subclass may choose to override + * {@link #startPreprocessorAsync(ControlLoopOperationParams) startPreprocessorAsync()}. + * <p/> + * The futures returned by the methods within this class can be canceled, and will + * propagate the cancellation to any subtasks. Thus it is also expected that any futures + * returned by overridden methods will do the same. Of course, if a class overrides + * {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome) doOperation()}, + * then there's little that can be done to cancel that particular operation. */ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Object>> implements Operator { private static final Logger logger = LoggerFactory.getLogger(OperatorPartial.class); - private static final String OUTCOME_SUCCESS = PolicyResult.SUCCESS.toString(); - private static final String OUTCOME_FAILURE = PolicyResult.FAILURE.toString(); - private static final String OUTCOME_RETRIES = PolicyResult.FAILURE_RETRIES.toString(); + /** + * Executor to be used for tasks that may perform blocking I/O. The default executor + * simply launches a new thread for each command that is submitted to it. + * <p/> + * May be overridden by junit tests. + */ + @Getter(AccessLevel.PROTECTED) + @Setter(AccessLevel.PROTECTED) + private Executor blockingExecutor = command -> { + Thread thread = new Thread(command); + thread.setDaemon(true); + thread.start(); + }; @Getter private final String actorName; @@ -103,94 +127,52 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj } @Override - public final CompletableFuture<ControlLoopOperation> startOperation(ControlLoopOperationParams params) { + public final CompletableFuture<OperationOutcome> startOperation(ControlLoopOperationParams params) { if (!isAlive()) { throw new IllegalStateException("operation is not running: " + getFullName()); } - final Executor executor = params.getExecutor(); - // allocate a controller for the entire operation - final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>(); + final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - CompletableFuture<ControlLoopOperation> preproc = startPreprocessor(params); + CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync(params); if (preproc == null) { // no preprocessor required - just start the operation return startOperationAttempt(params, controller, 1); } - // propagate "stop" to the preprocessor - controller.add(preproc); - /* * Do preprocessor first and then, if successful, start the operation. Note: * operations create their own outcome, ignoring the outcome from any previous * steps. + * + * Wrap the preprocessor to ensure "stop" is propagated to it. */ - preproc.whenCompleteAsync(controller.delayedRemove(preproc), executor) - .thenComposeAsync(handleFailure(params, controller), executor) - .thenComposeAsync(onSuccess(params, unused -> startOperationAttempt(params, controller, 1)), - executor); - - return controller; - } - - /** - * Starts an operation's preprocessor step(s). If the preprocessor fails, then it - * invokes the started and completed call-backs. - * - * @param params operation parameters - * @return a future that will return the preprocessor outcome, or {@code null} if this - * operation needs no preprocessor - */ - protected CompletableFuture<ControlLoopOperation> startPreprocessor(ControlLoopOperationParams params) { - logger.info("{}: start low-level operation preprocessor for {}", getFullName(), params.getRequestId()); - - final Executor executor = params.getExecutor(); - final ControlLoopOperation operation = params.makeOutcome(); - - final Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preproc = - doPreprocessorAsFuture(params); - if (preproc == null) { - // no preprocessor required - return null; - } - - // allocate a controller for the preprocessor steps - final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>(); - - /* - * Don't mark it complete until we've built the whole pipeline. This will prevent - * the operation from starting until after it has been successfully built (i.e., - * without generating any exceptions). - */ - final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>(); - // @formatter:off - firstFuture - .thenComposeAsync(controller.add(preproc), executor) - .exceptionally(fromException(params, operation)) - .whenCompleteAsync(controller.delayedComplete(), executor); + controller.wrap(preproc) + .exceptionally(fromException(params, "preprocessor of operation")) + .thenCompose(handlePreprocessorFailure(params, controller)) + .thenCompose(unusedOutcome -> startOperationAttempt(params, controller, 1)); // @formatter:on - // start the pipeline - firstFuture.complete(operation); - return controller; } /** * Handles a failure in the preprocessor pipeline. If a failure occurred, then it - * invokes the call-backs and returns a failed outcome. Otherwise, it returns the - * outcome that it received. + * invokes the call-backs, marks the controller complete, and returns an incomplete + * future, effectively halting the pipeline. Otherwise, it returns the outcome that it + * received. + * <p/> + * Assumes that no callbacks have been invoked yet. * * @param params operation parameters * @param controller pipeline controller * @return a function that checks the outcome status and continues, if successful, or * indicates a failure otherwise */ - private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> handleFailure( - ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller) { + private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure( + ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller) { return outcome -> { @@ -207,19 +189,21 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj // propagate "stop" to the callbacks controller.add(callbacks); - final ControlLoopOperation outcome2 = params.makeOutcome(); + final OperationOutcome outcome2 = params.makeOutcome(); // TODO need a FAILURE_MISSING_DATA (e.g., A&AI) - outcome2.setOutcome(PolicyResult.FAILURE_GUARD.toString()); + outcome2.setResult(PolicyResult.FAILURE_GUARD); outcome2.setMessage(outcome != null ? outcome.getMessage() : null); - CompletableFuture.completedFuture(outcome2).thenApplyAsync(callbackStarted(params, callbacks), executor) - .thenApplyAsync(callbackCompleted(params, callbacks), executor) - .whenCompleteAsync(controller.delayedRemove(callbacks), executor) + // @formatter:off + CompletableFuture.completedFuture(outcome2) + .whenCompleteAsync(callbackStarted(params, callbacks), executor) + .whenCompleteAsync(callbackCompleted(params, callbacks), executor) .whenCompleteAsync(controller.delayedComplete(), executor); + // @formatter:on - return controller; + return new CompletableFuture<>(); }; } @@ -237,8 +221,7 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @return a function that will start the preprocessor and returns its outcome, or * {@code null} if this operation needs no preprocessor */ - protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture( - ControlLoopOperationParams params) { + protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) { return null; } @@ -251,20 +234,12 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param attempt attempt number, typically starting with 1 * @return a future that will return the final result of all attempts */ - private CompletableFuture<ControlLoopOperation> startOperationAttempt(ControlLoopOperationParams params, - PipelineControllerFuture<ControlLoopOperation> controller, int attempt) { - - final Executor executor = params.getExecutor(); - - CompletableFuture<ControlLoopOperation> future = startAttemptWithoutRetries(params, attempt); + private CompletableFuture<OperationOutcome> startOperationAttempt(ControlLoopOperationParams params, + PipelineControllerFuture<OperationOutcome> controller, int attempt) { // propagate "stop" to the operation attempt - controller.add(future); - - // detach when complete - future.whenCompleteAsync(controller.delayedRemove(future), executor) - .thenComposeAsync(retryOnFailure(params, controller, attempt), params.getExecutor()) - .whenCompleteAsync(controller.delayedComplete(), executor); + controller.wrap(startAttemptWithoutRetries(params, attempt)) + .thenCompose(retryOnFailure(params, controller, attempt)); return controller; } @@ -276,40 +251,32 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param attempt attempt number, typically starting with 1 * @return a future that will return the result of a single operation attempt */ - private CompletableFuture<ControlLoopOperation> startAttemptWithoutRetries(ControlLoopOperationParams params, + private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(ControlLoopOperationParams params, int attempt) { logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId()); final Executor executor = params.getExecutor(); - final ControlLoopOperation outcome = params.makeOutcome(); + final OperationOutcome outcome = params.makeOutcome(); final CallbackManager callbacks = new CallbackManager(); // this operation attempt gets its own controller - final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>(); + final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); // propagate "stop" to the callbacks controller.add(callbacks); - /* - * Don't mark it complete until we've built the whole pipeline. This will prevent - * the operation from starting until after it has been successfully built (i.e., - * without generating any exceptions). - */ - final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>(); - // @formatter:off - CompletableFuture<ControlLoopOperation> future2 = - firstFuture.thenComposeAsync(verifyRunning(controller, params), executor) - .thenApplyAsync(callbackStarted(params, callbacks), executor) - .thenComposeAsync(controller.add(doOperationAsFuture(params, attempt)), executor); + CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome) + .whenCompleteAsync(callbackStarted(params, callbacks), executor) + .thenCompose(controller.wrap(outcome2 -> startOperationAsync(params, attempt, outcome2))); // @formatter:on // handle timeouts, if specified - long timeoutMillis = getTimeOutMillis(params.getPolicy()); + long timeoutMillis = getTimeOutMillis(params.getTimeoutSec()); if (timeoutMillis > 0) { logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId()); - future2 = future2.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS); + future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS); } /* @@ -321,16 +288,13 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj */ // @formatter:off - future2.exceptionally(fromException(params, outcome)) - .thenApplyAsync(setRetryFlag(params, attempt), executor) - .thenApplyAsync(callbackStarted(params, callbacks), executor) - .thenApplyAsync(callbackCompleted(params, callbacks), executor) + future.exceptionally(fromException(params, "operation")) + .thenApply(setRetryFlag(params, attempt)) + .whenCompleteAsync(callbackStarted(params, callbacks), executor) + .whenCompleteAsync(callbackCompleted(params, callbacks), executor) .whenCompleteAsync(controller.delayedComplete(), executor); // @formatter:on - // start the pipeline - firstFuture.complete(outcome); - return controller; } @@ -340,8 +304,8 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param outcome outcome to examine * @return {@code true} if the outcome was successful */ - protected boolean isSuccess(ControlLoopOperation outcome) { - return OUTCOME_SUCCESS.equals(outcome.getOutcome()); + protected boolean isSuccess(OperationOutcome outcome) { + return (outcome.getResult() == PolicyResult.SUCCESS); } /** @@ -351,13 +315,29 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @return {@code true} if the outcome is not {@code null} and was a failure * <i>and</i> was associated with this operator, {@code false} otherwise */ - protected boolean isActorFailed(ControlLoopOperation outcome) { - return OUTCOME_FAILURE.equals(getActorOutcome(outcome)); + protected boolean isActorFailed(OperationOutcome outcome) { + return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE); + } + + /** + * Determines if the given outcome is for this operation. + * + * @param outcome outcome to examine + * @return {@code true} if the outcome is for this operation, {@code false} otherwise + */ + protected boolean isSameOperation(OperationOutcome outcome) { + return OperationOutcome.isFor(outcome, getActorName(), getName()); } /** * Invokes the operation as a "future". This method simply invokes - * {@link #doOperation(ControlLoopOperationParams)} turning it into a "future". + * {@link #doOperation(ControlLoopOperationParams)} using the {@link #blockingExecutor + * "blocking executor"}, returning the result via a "future". + * <p/> + * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using + * the executor in the "params", as that may bring the background thread pool to a + * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used + * instead. * <p/> * This method assumes the following: * <ul> @@ -373,31 +353,23 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @return a function that will start the operation and return its result when * complete */ - protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture( - ControlLoopOperationParams params, int attempt) { - - /* - * TODO As doOperation() may perform blocking I/O, this should be launched in its - * own thread to prevent the ForkJoinPool from being tied up. Should probably - * provide a method to make that easy. - */ + protected CompletableFuture<OperationOutcome> startOperationAsync(ControlLoopOperationParams params, int attempt, + OperationOutcome outcome) { - return operation -> CompletableFuture.supplyAsync(() -> doOperation(params, attempt, operation), - params.getExecutor()); + return CompletableFuture.supplyAsync(() -> doOperation(params, attempt, outcome), getBlockingExecutor()); } /** * Low-level method that performs the operation. This can make the same assumptions * that are made by {@link #doOperationAsFuture(ControlLoopOperationParams)}. This - * method throws an {@link UnsupportedOperationException}. + * particular method simply throws an {@link UnsupportedOperationException}. * * @param params operation parameters * @param attempt attempt number, typically starting with 1 * @param operation the operation being performed * @return the outcome of the operation */ - protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt, - ControlLoopOperation operation) { + protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt, OperationOutcome operation) { throw new UnsupportedOperationException("start operation " + getFullName()); } @@ -411,8 +383,7 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param attempt latest attempt number, starting with 1 * @return a function to get the next future to execute */ - private Function<ControlLoopOperation, ControlLoopOperation> setRetryFlag(ControlLoopOperationParams params, - int attempt) { + private Function<OperationOutcome, OperationOutcome> setRetryFlag(ControlLoopOperationParams params, int attempt) { return operation -> { if (operation != null && !isActorFailed(operation)) { @@ -424,22 +395,22 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj } // get a non-null operation - ControlLoopOperation oper2; + OperationOutcome oper2; if (operation != null) { oper2 = operation; } else { oper2 = params.makeOutcome(); - oper2.setOutcome(OUTCOME_FAILURE); + oper2.setResult(PolicyResult.FAILURE); } - if (params.getPolicy().getRetry() != null && params.getPolicy().getRetry() > 0 - && attempt > params.getPolicy().getRetry()) { + Integer retry = params.getRetry(); + if (retry != null && retry > 0 && attempt > retry) { /* * retries were specified and we've already tried them all - change to * FAILURE_RETRIES */ logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId()); - oper2.setOutcome(OUTCOME_RETRIES); + oper2.setResult(PolicyResult.FAILURE_RETRIES); } return oper2; @@ -456,21 +427,24 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param attempt latest attempt number, starting with 1 * @return a function to get the next future to execute */ - private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> retryOnFailure( - ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller, + private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure( + ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller, int attempt) { return operation -> { if (!isActorFailed(operation)) { // wrong type or wrong operation - just leave it as is logger.trace("not retrying operation {} for {}", getFullName(), params.getRequestId()); - return CompletableFuture.completedFuture(operation); + controller.complete(operation); + return new CompletableFuture<>(); } - if (params.getPolicy().getRetry() == null || params.getPolicy().getRetry() <= 0) { + Integer retry = params.getRetry(); + if (retry == null || retry <= 0) { // no retries - already marked as FAILURE, so just return it logger.info("operation {} no retries for {}", getFullName(), params.getRequestId()); - return CompletableFuture.completedFuture(operation); + controller.complete(operation); + return new CompletableFuture<>(); } @@ -484,100 +458,279 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj } /** - * Gets the outcome of an operation for this operation. + * Converts an exception into an operation outcome, returning a copy of the outcome to + * prevent background jobs from changing it. * - * @param operation operation whose outcome is to be extracted - * @return the outcome of the given operation, if it's for this operator, {@code null} - * otherwise + * @param params operation parameters + * @param type type of item throwing the exception + * @return a function that will convert an exception into an operation outcome */ - protected String getActorOutcome(ControlLoopOperation operation) { - if (operation == null) { - return null; - } + private Function<Throwable, OperationOutcome> fromException(ControlLoopOperationParams params, String type) { - if (!getActorName().equals(operation.getActor())) { - return null; - } + return thrown -> { + OperationOutcome outcome = params.makeOutcome(); + + logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(), + params.getRequestId(), thrown); + + return setOutcome(params, outcome, thrown); + }; + } + + /** + * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels + * any outstanding futures when one completes. + * + * @param params operation parameters + * @param futures futures for which to wait + * @return a future to cancel or await an outcome. If this future is canceled, then + * all of the futures will be canceled + */ + protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params, + List<CompletableFuture<OperationOutcome>> futures) { + + // convert list to an array + @SuppressWarnings("rawtypes") + CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]); + + @SuppressWarnings("unchecked") + CompletableFuture<OperationOutcome> result = anyOf(params, arrFutures); + return result; + } + + /** + * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any + * outstanding futures when one completes. + * + * @param params operation parameters + * @param futures futures for which to wait + * @return a future to cancel or await an outcome. If this future is canceled, then + * all of the futures will be canceled + */ + protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params, + @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) { + + final Executor executor = params.getExecutor(); + final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + attachFutures(controller, futures); + + // @formatter:off + CompletableFuture.anyOf(futures) + .thenApply(object -> (OperationOutcome) object) + .whenCompleteAsync(controller.delayedComplete(), executor); + // @formatter:on + + return controller; + } + + /** + * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels + * the futures if returned future is canceled. The future returns the "worst" outcome, + * based on priority (see {@link #detmPriority(OperationOutcome)}). + * + * @param params operation parameters + * @param futures futures for which to wait + * @return a future to cancel or await an outcome. If this future is canceled, then + * all of the futures will be canceled + */ + protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params, + List<CompletableFuture<OperationOutcome>> futures) { - if (!getName().equals(operation.getOperation())) { - return null; + // convert list to an array + @SuppressWarnings("rawtypes") + CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]); + + @SuppressWarnings("unchecked") + CompletableFuture<OperationOutcome> result = allOf(params, arrFutures); + return result; + } + + /** + * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the + * futures if returned future is canceled. The future returns the "worst" outcome, + * based on priority (see {@link #detmPriority(OperationOutcome)}). + * + * @param params operation parameters + * @param futures futures for which to wait + * @return a future to cancel or await an outcome. If this future is canceled, then + * all of the futures will be canceled + */ + protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params, + @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) { + + final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + attachFutures(controller, futures); + + OperationOutcome[] outcomes = new OperationOutcome[futures.length]; + + @SuppressWarnings("rawtypes") + CompletableFuture[] futures2 = new CompletableFuture[futures.length]; + + // record the outcomes of each future when it completes + for (int count = 0; count < futures2.length; ++count) { + final int count2 = count; + futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2); } - return operation.getOutcome(); + CompletableFuture.allOf(futures2).whenComplete(combineOutcomes(params, controller, outcomes)); + + return controller; } /** - * Gets a function that will start the next step, if the current operation was - * successful, or just return the current operation, otherwise. + * Attaches the given futures to the controller. + * + * @param controller master controller for all of the futures + * @param futures futures to be attached to the controller + */ + private void attachFutures(PipelineControllerFuture<OperationOutcome> controller, + @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) { + + // attach each task + for (CompletableFuture<OperationOutcome> future : futures) { + controller.add(future); + } + } + + /** + * Combines the outcomes from a set of tasks. * * @param params operation parameters - * @param nextStep function that will invoke the next step, passing it the operation - * @return a function that will start the next step + * @param future future to be completed with the combined result + * @param outcomes outcomes to be examined */ - protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> onSuccess( - ControlLoopOperationParams params, - Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> nextStep) { + private BiConsumer<Void, Throwable> combineOutcomes(ControlLoopOperationParams params, + CompletableFuture<OperationOutcome> future, OperationOutcome[] outcomes) { - return operation -> { + return (unused, thrown) -> { + if (thrown != null) { + future.completeExceptionally(thrown); + return; + } - if (operation == null) { - logger.trace("{}: null outcome - discarding next task for {}", getFullName(), params.getRequestId()); - ControlLoopOperation outcome = params.makeOutcome(); - outcome.setOutcome(OUTCOME_FAILURE); - return CompletableFuture.completedFuture(outcome); + // identify the outcome with the highest priority + OperationOutcome outcome = outcomes[0]; + int priority = detmPriority(outcome); - } else if (isSuccess(operation)) { - logger.trace("{}: success - starting next task for {}", getFullName(), params.getRequestId()); - return nextStep.apply(operation); + // start with "1", as we've already dealt with "0" + for (int count = 1; count < outcomes.length; ++count) { + OperationOutcome outcome2 = outcomes[count]; + int priority2 = detmPriority(outcome2); - } else { - logger.trace("{}: failure - discarding next task for {}", getFullName(), params.getRequestId()); - return CompletableFuture.completedFuture(operation); + if (priority2 > priority) { + outcome = outcome2; + priority = priority2; + } } + + logger.trace("{}: combined outcome of tasks is {} for {}", getFullName(), + (outcome == null ? null : outcome.getResult()), params.getRequestId()); + + future.complete(outcome); }; } /** - * Converts an exception into an operation outcome, returning a copy of the outcome to - * prevent background jobs from changing it. + * Determines the priority of an outcome based on its result. * - * @param params operation parameters - * @param operation current operation - * @return a function that will convert an exception into an operation outcome + * @param outcome outcome to examine, or {@code null} + * @return the outcome's priority */ - private Function<Throwable, ControlLoopOperation> fromException(ControlLoopOperationParams params, - ControlLoopOperation operation) { + protected int detmPriority(OperationOutcome outcome) { + if (outcome == null) { + return 1; + } - return thrown -> { - logger.warn("exception throw by operation {}.{} for {}", operation.getActor(), operation.getOperation(), - params.getRequestId(), thrown); + switch (outcome.getResult()) { + case SUCCESS: + return 0; + + case FAILURE_GUARD: + return 2; + + case FAILURE_RETRIES: + return 3; + + case FAILURE: + return 4; + + case FAILURE_TIMEOUT: + return 5; + + case FAILURE_EXCEPTION: + default: + return 6; + } + } + + /** + * Performs a task, after verifying that the controller is still running. Also checks + * that the previous outcome was successful, if specified. + * + * @param params operation parameters + * @param controller overall pipeline controller + * @param checkSuccess {@code true} to check the previous outcome, {@code false} + * otherwise + * @param outcome outcome of the previous task + * @param tasks tasks to be performed + * @return a function to perform the task. If everything checks out, then it returns + * the task's future. Otherwise, it returns an incomplete future and completes + * the controller instead. + */ + // @formatter:off + protected CompletableFuture<OperationOutcome> doTask(ControlLoopOperationParams params, + PipelineControllerFuture<OperationOutcome> controller, + boolean checkSuccess, OperationOutcome outcome, + CompletableFuture<OperationOutcome> task) { + // @formatter:on + if (checkSuccess && !isSuccess(outcome)) { /* - * Must make a copy of the operation, as the original could be changed by - * background jobs that might still be running. + * must complete before canceling so that cancel() doesn't cause controller to + * complete */ - return setOutcome(params, new ControlLoopOperation(operation), thrown); - }; + controller.complete(outcome); + task.cancel(false); + return new CompletableFuture<>(); + } + + return controller.wrap(task); } /** - * Gets a function to verify that the operation is still running. If the pipeline is - * not running, then it returns an incomplete future, which will effectively halt - * subsequent operations in the pipeline. This method is intended to be used with one - * of the {@link CompletableFuture}'s <i>thenCompose()</i> methods. + * Performs a task, after verifying that the controller is still running. Also checks + * that the previous outcome was successful, if specified. * - * @param controller pipeline controller * @param params operation parameters - * @return a function to verify that the operation is still running + * @param controller overall pipeline controller + * @param checkSuccess {@code true} to check the previous outcome, {@code false} + * otherwise + * @param tasks tasks to be performed + * @return a function to perform the task. If everything checks out, then it returns + * the task's future. Otherwise, it returns an incomplete future and completes + * the controller instead. */ - protected <T> Function<T, CompletableFuture<T>> verifyRunning( - PipelineControllerFuture<ControlLoopOperation> controller, ControlLoopOperationParams params) { + // @formatter:off + protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask(ControlLoopOperationParams params, + PipelineControllerFuture<OperationOutcome> controller, + boolean checkSuccess, + Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) { + // @formatter:on + + return outcome -> { + + if (!controller.isRunning()) { + return new CompletableFuture<>(); + } - return value -> { - boolean running = controller.isRunning(); - logger.trace("{}: verify running {} for {}", getFullName(), running, params.getRequestId()); + if (checkSuccess && !isSuccess(outcome)) { + controller.complete(outcome); + return new CompletableFuture<>(); + } - return (running ? CompletableFuture.completedFuture(value) : new CompletableFuture<>()); + return controller.wrap(task.apply(outcome)); }; } @@ -591,10 +744,10 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param callbacks used to determine if the start callback can be invoked * @return a function that sets the start time and invokes the callback */ - private Function<ControlLoopOperation, ControlLoopOperation> callbackStarted(ControlLoopOperationParams params, + private BiConsumer<OperationOutcome, Throwable> callbackStarted(ControlLoopOperationParams params, CallbackManager callbacks) { - return outcome -> { + return (outcome, thrown) -> { if (callbacks.canStart()) { // haven't invoked "start" callback yet @@ -602,8 +755,6 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj outcome.setEnd(null); params.callbackStarted(outcome); } - - return outcome; }; } @@ -621,18 +772,16 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param callbacks used to determine if the end callback can be invoked * @return a function that sets the end time and invokes the callback */ - private Function<ControlLoopOperation, ControlLoopOperation> callbackCompleted(ControlLoopOperationParams params, + private BiConsumer<OperationOutcome, Throwable> callbackCompleted(ControlLoopOperationParams params, CallbackManager callbacks) { - return operation -> { + return (outcome, thrown) -> { if (callbacks.canEnd()) { - operation.setStart(callbacks.getStartTime()); - operation.setEnd(callbacks.getEndTime()); - params.callbackCompleted(operation); + outcome.setStart(callbacks.getStartTime()); + outcome.setEnd(callbacks.getEndTime()); + params.callbackCompleted(outcome); } - - return operation; }; } @@ -643,7 +792,7 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param operation operation to be updated * @return the updated operation */ - protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation, + protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation, Throwable thrown) { PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION); return setOutcome(params, operation, result); @@ -657,10 +806,10 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param result result of the operation * @return the updated operation */ - protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation, + protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation, PolicyResult result) { logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId()); - operation.setOutcome(result.toString()); + operation.setResult(result); operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG : ControlLoopOperation.FAILED_MSG); @@ -687,71 +836,10 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * Gets the operation timeout. Subclasses may override this method to obtain the * timeout in some other way (e.g., through configuration properties). * - * @param policy policy from which to extract the timeout + * @param timeoutSec timeout, in seconds, or {@code null} * @return the operation timeout, in milliseconds */ - protected long getTimeOutMillis(Policy policy) { - Integer timeoutSec = policy.getTimeout(); + protected long getTimeOutMillis(Integer timeoutSec) { return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS)); } - - /** - * Manager for "start" and "end" callbacks. - */ - private static class CallbackManager implements Runnable { - private final AtomicReference<Instant> startTime = new AtomicReference<>(); - private final AtomicReference<Instant> endTime = new AtomicReference<>(); - - /** - * Determines if the "start" callback can be invoked. If so, it sets the - * {@link #startTime} to the current time. - * - * @return {@code true} if the "start" callback can be invoked, {@code false} - * otherwise - */ - public boolean canStart() { - return startTime.compareAndSet(null, Instant.now()); - } - - /** - * Determines if the "end" callback can be invoked. If so, it sets the - * {@link #endTime} to the current time. - * - * @return {@code true} if the "end" callback can be invoked, {@code false} - * otherwise - */ - public boolean canEnd() { - return endTime.compareAndSet(null, Instant.now()); - } - - /** - * Gets the start time. - * - * @return the start time, or {@code null} if {@link #canStart()} has not been - * invoked yet. - */ - public Instant getStartTime() { - return startTime.get(); - } - - /** - * Gets the end time. - * - * @return the end time, or {@code null} if {@link #canEnd()} has not been invoked - * yet. - */ - public Instant getEndTime() { - return endTime.get(); - } - - /** - * Prevents further callbacks from being executed by setting {@link #startTime} - * and {@link #endTime}. - */ - @Override - public void run() { - canStart(); - canEnd(); - } - } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java index 08aba81f2..57fce40d7 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java @@ -20,6 +20,7 @@ package org.onap.policy.controlloop.actorserviceprovider.parameters; +import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -32,11 +33,11 @@ import lombok.Getter; import org.onap.policy.common.parameters.BeanValidationResult; import org.onap.policy.common.parameters.BeanValidator; import org.onap.policy.common.parameters.annotations.NotNull; -import org.onap.policy.controlloop.ControlLoopOperation; import org.onap.policy.controlloop.actorserviceprovider.ActorService; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; import org.onap.policy.controlloop.actorserviceprovider.Util; import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext; -import org.onap.policy.controlloop.policy.Policy; +import org.onap.policy.controlloop.policy.Target; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,36 +50,67 @@ import org.slf4j.LoggerFactory; @AllArgsConstructor @EqualsAndHashCode public class ControlLoopOperationParams { - private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationParams.class); - public static final String UNKNOWN = "-unknown-"; - + /** + * Actor name. + */ + @NotNull + private String actor; /** - * The actor service in which to find the actor/operation. + * Actor service in which to find the actor/operation. */ @NotNull private ActorService actorService; /** - * The event for which the operation applies. + * Event for which the operation applies. */ @NotNull private ControlLoopEventContext context; /** - * The executor to use to run the operation. + * Executor to use to run the operation. */ @NotNull @Builder.Default private Executor executor = ForkJoinPool.commonPool(); /** - * The policy associated with the operation. + * Operation name. + */ + @NotNull + private String operation; + + /** + * Payload data for the request. + */ + private Map<String, String> payload; + + /** + * Number of retries allowed, or {@code null} if no retries. + */ + private Integer retry; + + /** + * The entity's target information. May be {@code null}, depending on the requirement + * of the operation to be invoked. + */ + private Target target; + + /** + * Target entity. */ @NotNull - private Policy policy; + private String targetEntity; + + /** + * Timeout, in seconds, or {@code null} if no timeout. Zero and negative values also + * imply no timeout. + */ + @Builder.Default + private Integer timeoutSec = 300; /** * The function to invoke when the operation starts. This is optional. @@ -87,7 +119,7 @@ public class ControlLoopOperationParams { * may happen if the current operation requires other operations to be performed first * (e.g., A&AI queries, guard checks). */ - private Consumer<ControlLoopOperation> startCallback; + private Consumer<OperationOutcome> startCallback; /** * The function to invoke when the operation completes. This is optional. @@ -96,13 +128,7 @@ public class ControlLoopOperationParams { * may happen if the current operation requires other operations to be performed first * (e.g., A&AI queries, guard checks). */ - private Consumer<ControlLoopOperation> completeCallback; - - /** - * Target entity. - */ - @NotNull - private String target; + private Consumer<OperationOutcome> completeCallback; /** * Starts the specified operation. @@ -110,7 +136,7 @@ public class ControlLoopOperationParams { * @return a future that will return the result of the operation * @throws IllegalArgumentException if the parameters are invalid */ - public CompletableFuture<ControlLoopOperation> start() { + public CompletableFuture<OperationOutcome> start() { BeanValidationResult result = validate(); if (!result.isValid()) { logger.warn("parameter error in operation {}.{} for {}:\n{}", getActor(), getOperation(), getRequestId(), @@ -120,31 +146,13 @@ public class ControlLoopOperationParams { // @formatter:off return actorService - .getActor(policy.getActor()) - .getOperator(policy.getRecipe()) + .getActor(getActor()) + .getOperator(getOperation()) .startOperation(this); // @formatter:on } /** - * Gets the name of the actor from the policy. - * - * @return the actor name, or {@link #UNKNOWN} if no name is available - */ - public String getActor() { - return (policy == null || policy.getActor() == null ? UNKNOWN : policy.getActor()); - } - - /** - * Gets the name of the operation from the policy. - * - * @return the operation name, or {@link #UNKNOWN} if no name is available - */ - public String getOperation() { - return (policy == null || policy.getRecipe() == null ? UNKNOWN : policy.getRecipe()); - } - - /** * Gets the requested ID of the associated event. * * @return the event's request ID, or {@code null} if no request ID is available @@ -158,13 +166,13 @@ public class ControlLoopOperationParams { * * @return a new operation outcome */ - public ControlLoopOperation makeOutcome() { - ControlLoopOperation operation = new ControlLoopOperation(); - operation.setActor(getActor()); - operation.setOperation(getOperation()); - operation.setTarget(target); + public OperationOutcome makeOutcome() { + OperationOutcome outcome = new OperationOutcome(); + outcome.setActor(getActor()); + outcome.setOperation(getOperation()); + outcome.setTarget(targetEntity); - return operation; + return outcome; } /** @@ -173,11 +181,11 @@ public class ControlLoopOperationParams { * * @param operation the operation that is being started */ - public void callbackStarted(ControlLoopOperation operation) { + public void callbackStarted(OperationOutcome operation) { logger.info("started operation {}.{} for {}", operation.getActor(), operation.getOperation(), getRequestId()); if (startCallback != null) { - Util.logException(() -> startCallback.accept(operation), "{}.{}: start-callback threw an exception for {}", + Util.runFunction(() -> startCallback.accept(operation), "{}.{}: start-callback threw an exception for {}", operation.getActor(), operation.getOperation(), getRequestId()); } } @@ -188,12 +196,12 @@ public class ControlLoopOperationParams { * * @param operation the operation that is being started */ - public void callbackCompleted(ControlLoopOperation operation) { + public void callbackCompleted(OperationOutcome operation) { logger.info("completed operation {}.{} outcome={} for {}", operation.getActor(), operation.getOperation(), - operation.getOutcome(), getRequestId()); + operation.getResult(), getRequestId()); if (completeCallback != null) { - Util.logException(() -> completeCallback.accept(operation), + Util.runFunction(() -> completeCallback.accept(operation), "{}.{}: complete-callback threw an exception for {}", operation.getActor(), operation.getOperation(), getRequestId()); } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java index d34a3fb5b..1d64a8710 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java @@ -101,7 +101,7 @@ public class ListenerManager { */ protected void runListener(Runnable listener) { // TODO do this asynchronously? - Util.logException(listener, "pipeline listener {} threw an exception", listener); + Util.runFunction(listener, "pipeline listener {} threw an exception", listener); } /** diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java index 96c8f9e05..92843e28a 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java @@ -23,41 +23,70 @@ package org.onap.policy.controlloop.actorserviceprovider.pipeline; import static org.onap.policy.controlloop.actorserviceprovider.Util.ident; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; import lombok.NoArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Pipeline controller, used by operations within the pipeline to determine if they should - * continue to run. If {@link #cancel(boolean)} is invoked, it automatically stops the - * pipeline. + * continue to run. Whenever this is canceled or completed, it automatically cancels all + * futures and runs all listeners that have been added. */ @NoArgsConstructor public class PipelineControllerFuture<T> extends CompletableFuture<T> { private static final Logger logger = LoggerFactory.getLogger(PipelineControllerFuture.class); + private static final String COMPLETE_EXCEPT_MSG = "{}: complete future with exception"; + private static final String CANCEL_MSG = "{}: cancel future"; + private static final String COMPLETE_MSG = "{}: complete future"; + /** * Tracks items added to this controller via one of the <i>add</i> methods. */ private final FutureManager futures = new FutureManager(); - /** - * Cancels and stops the pipeline, in that order. - */ @Override public boolean cancel(boolean mayInterruptIfRunning) { - try { - logger.trace("{}: cancel future", ident(this)); - return super.cancel(mayInterruptIfRunning); + return doAndStop(() -> super.cancel(mayInterruptIfRunning), CANCEL_MSG, ident(this)); + } - } finally { - futures.stop(); - } + @Override + public boolean complete(T value) { + return doAndStop(() -> super.complete(value), COMPLETE_MSG, ident(this)); + } + + @Override + public boolean completeExceptionally(Throwable ex) { + return doAndStop(() -> super.completeExceptionally(ex), COMPLETE_EXCEPT_MSG, ident(this)); + } + + @Override + public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor) { + return super.completeAsync(() -> doAndStop(supplier, COMPLETE_MSG, ident(this)), executor); + } + + @Override + public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) { + return super.completeAsync(() -> doAndStop(supplier, COMPLETE_MSG, ident(this))); + } + + @Override + public CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit) { + logger.info("{}: set future timeout to {} {}", ident(this), timeout, unit); + return super.completeOnTimeout(value, timeout, unit); + } + + @Override + public <U> PipelineControllerFuture<U> newIncompleteFuture() { + return new PipelineControllerFuture<>(); } /** @@ -67,11 +96,8 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> { * * @return a function that removes the given future */ - public <F> BiConsumer<T, Throwable> delayedRemove(Future<F> future) { - return (value, thrown) -> { - logger.trace("{}: remove future {}", ident(this), ident(future)); - remove(future); - }; + public <F> BiConsumer<F, Throwable> delayedRemove(Future<F> future) { + return (value, thrown) -> remove(future); } /** @@ -81,11 +107,8 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> { * * @return a function that removes the given listener */ - public BiConsumer<T, Throwable> delayedRemove(Runnable listener) { - return (value, thrown) -> { - logger.trace("{}: remove listener {}", ident(this), ident(listener)); - remove(listener); - }; + public <F> BiConsumer<F, Throwable> delayedRemove(Runnable listener) { + return (value, thrown) -> remove(listener); } /** @@ -98,25 +121,43 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> { public BiConsumer<T, Throwable> delayedComplete() { return (value, thrown) -> { if (thrown == null) { - logger.trace("{}: complete and stop future", ident(this)); complete(value); } else { - logger.trace("{}: complete exceptionally and stop future", ident(this)); completeExceptionally(thrown); } - - futures.stop(); }; } /** + * Adds a future to the controller and arranges for it to be removed from the + * controller when it completes, whether or not it throws an exception. If the + * controller has already been stopped, then the future is canceled and a new, + * incomplete future is returned. + * + * @param future future to be wrapped + * @return a new future + */ + public CompletableFuture<T> wrap(CompletableFuture<T> future) { + if (!isRunning()) { + logger.trace("{}: not running, skipping next task {}", ident(this), ident(future)); + future.cancel(false); + return new CompletableFuture<>(); + } + + add(future); + return future.whenComplete(this.delayedRemove(future)); + } + + /** * Adds a function whose return value is to be canceled when this controller is * stopped. Note: if the controller is already stopped, then the function will * <i>not</i> be executed. * - * @param futureMaker function to be invoked in the future + * @param futureMaker function to be invoked to create the future + * @return a function to create the future and arrange for it to be managed by this + * controller */ - public <F> Function<F, CompletableFuture<F>> add(Function<F, CompletableFuture<F>> futureMaker) { + public <F> Function<F, CompletableFuture<F>> wrap(Function<F, CompletableFuture<F>> futureMaker) { return input -> { if (!isRunning()) { @@ -127,7 +168,7 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> { CompletableFuture<F> future = futureMaker.apply(input); add(future); - return future; + return future.whenComplete(delayedRemove(future)); }; } @@ -154,4 +195,26 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> { logger.trace("{}: remove listener {}", ident(this), ident(listener)); futures.remove(listener); } + + /** + * Performs an operation, stops the futures, and returns the value from the operation. + * Logs a message using the given arguments. + * + * + * @param <R> type of value to be returned + * @param supplier operation to perform + * @param message message to be logged + * @param args message arguments to fill "{}" place-holders + * @return the operation's result + */ + private <R> R doAndStop(Supplier<R> supplier, String message, Object... args) { + try { + logger.trace(message, args); + return supplier.get(); + + } finally { + logger.trace("{}: stopping this future", ident(this)); + futures.stop(); + } + } } |