diff options
Diffstat (limited to 'models-interactions/model-actors/actorServiceProvider/src')
28 files changed, 2727 insertions, 720 deletions
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java index 13f09b1ad..2886b1feb 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java @@ -63,7 +63,6 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> { return newActor; } - // TODO: should this throw an exception? logger.warn("duplicate actor names for {}: {}, ignoring {}", name, existingActor.getClass().getSimpleName(), newActor.getClass().getSimpleName()); return existingActor; @@ -158,7 +157,7 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> { for (Actor actor : name2actor.values()) { if (actor.isConfigured()) { - Util.logException(actor::start, "failed to start actor {}", actor.getName()); + Util.runFunction(actor::start, "failed to start actor {}", actor.getName()); } else { logger.warn("not starting unconfigured actor {}", actor.getName()); @@ -170,7 +169,7 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> { protected void doStop() { logger.info("stopping actors"); name2actor.values() - .forEach(actor -> Util.logException(actor::stop, "failed to stop actor {}", actor.getName())); + .forEach(actor -> Util.runFunction(actor::stop, "failed to stop actor {}", actor.getName())); } @Override @@ -179,7 +178,7 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> { // @formatter:off name2actor.values().forEach( - actor -> Util.logException(actor::shutdown, "failed to shutdown actor {}", actor.getName())); + actor -> Util.runFunction(actor::shutdown, "failed to shutdown actor {}", actor.getName())); // @formatter:on } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java new file mode 100644 index 000000000..d78403809 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java @@ -0,0 +1,119 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import javax.ws.rs.client.InvocationCallback; +import lombok.AccessLevel; +import lombok.Getter; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handler for a <i>single</i> asynchronous response. + * + * @param <T> response type + */ +@Getter +public abstract class AsyncResponseHandler<T> implements InvocationCallback<T> { + + private static final Logger logger = LoggerFactory.getLogger(AsyncResponseHandler.class); + + @Getter(AccessLevel.NONE) + private final PipelineControllerFuture<OperationOutcome> result = new PipelineControllerFuture<>(); + private final ControlLoopOperationParams params; + private final OperationOutcome outcome; + + /** + * Constructs the object. + * + * @param params operation parameters + * @param outcome outcome to be populated based on the response + */ + public AsyncResponseHandler(ControlLoopOperationParams params, OperationOutcome outcome) { + this.params = params; + this.outcome = outcome; + } + + /** + * Handles the given future, arranging to cancel it when the response is received. + * + * @param future future to be handled + * @return a future to be used to cancel or wait for the response + */ + public CompletableFuture<OperationOutcome> handle(Future<T> future) { + result.add(future); + return result; + } + + /** + * Invokes {@link #doComplete()} and then completes "this" with the returned value. + */ + @Override + public void completed(T rawResponse) { + try { + logger.trace("{}.{}: response completed for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + result.complete(doComplete(rawResponse)); + + } catch (RuntimeException e) { + logger.trace("{}.{}: response handler threw an exception for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + result.completeExceptionally(e); + } + } + + /** + * Invokes {@link #doFailed()} and then completes "this" with the returned value. + */ + @Override + public void failed(Throwable throwable) { + try { + logger.trace("{}.{}: response failure for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + result.complete(doFailed(throwable)); + + } catch (RuntimeException e) { + logger.trace("{}.{}: response failure handler threw an exception for {}", params.getActor(), + params.getOperation(), params.getRequestId()); + result.completeExceptionally(e); + } + } + + /** + * Completes the processing of a response. + * + * @param rawResponse raw response that was received + * @return the outcome + */ + protected abstract OperationOutcome doComplete(T rawResponse); + + /** + * Handles a response exception. + * + * @param thrown exception that was thrown + * @return the outcome + */ + protected abstract OperationOutcome doFailed(Throwable thrown); +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManager.java new file mode 100644 index 000000000..7d7c1d902 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManager.java @@ -0,0 +1,84 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider; + +import java.time.Instant; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Manager for "start" and "end" callbacks. + */ +public class CallbackManager implements Runnable { + private final AtomicReference<Instant> startTime = new AtomicReference<>(); + private final AtomicReference<Instant> endTime = new AtomicReference<>(); + + /** + * Determines if the "start" callback can be invoked. If so, it sets the + * {@link #startTime} to the current time. + * + * @return {@code true} if the "start" callback can be invoked, {@code false} + * otherwise + */ + public boolean canStart() { + return startTime.compareAndSet(null, Instant.now()); + } + + /** + * Determines if the "end" callback can be invoked. If so, it sets the + * {@link #endTime} to the current time. + * + * @return {@code true} if the "end" callback can be invoked, {@code false} + * otherwise + */ + public boolean canEnd() { + return endTime.compareAndSet(null, Instant.now()); + } + + /** + * Gets the start time. + * + * @return the start time, or {@code null} if {@link #canStart()} has not been + * invoked yet. + */ + public Instant getStartTime() { + return startTime.get(); + } + + /** + * Gets the end time. + * + * @return the end time, or {@code null} if {@link #canEnd()} has not been invoked + * yet. + */ + public Instant getEndTime() { + return endTime.get(); + } + + /** + * Prevents further callbacks from being executed by setting {@link #startTime} + * and {@link #endTime}. + */ + @Override + public void run() { + canStart(); + canEnd(); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcome.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcome.java new file mode 100644 index 000000000..6b0924807 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcome.java @@ -0,0 +1,116 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider; + +import java.time.Instant; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.policy.PolicyResult; + +/** + * Outcome from an operation. Objects of this type are passed from one stage to the next. + */ +@Data +@NoArgsConstructor +public class OperationOutcome { + private String actor; + private String operation; + private String target; + private Instant start; + private Instant end; + private String subRequestId; + private PolicyResult result = PolicyResult.SUCCESS; + private String message; + + /** + * Copy constructor. + * + * @param source source object from which to copy + */ + public OperationOutcome(OperationOutcome source) { + this.actor = source.actor; + this.operation = source.operation; + this.target = source.target; + this.start = source.start; + this.end = source.end; + this.subRequestId = source.subRequestId; + this.result = source.result; + this.message = source.message; + } + + /** + * Creates a {@link ControlLoopOperation}, populating all fields with the values from + * this object. Sets the outcome field to the string representation of this object's + * outcome. + * + * @return + */ + public ControlLoopOperation toControlLoopOperation() { + ControlLoopOperation clo = new ControlLoopOperation(); + + clo.setActor(actor); + clo.setOperation(operation); + clo.setTarget(target); + clo.setStart(start); + clo.setEnd(end); + clo.setSubRequestId(subRequestId); + clo.setOutcome(result.toString()); + clo.setMessage(message); + + return clo; + } + + /** + * Determines if this outcome is for the given actor and operation. + * + * @param actor actor name + * @param operation operation name + * @return {@code true} if this outcome is for the given actor and operation + */ + public boolean isFor(@NonNull String actor, @NonNull String operation) { + // do the operation check first, as it's most likely to be unique + return (operation.equals(this.operation) && actor.equals(this.actor)); + } + + /** + * Determines if an outcome is for the given actor and operation. + * + * @param outcome outcome to be examined, or {@code null} + * @param actor actor name + * @param operation operation name + * @return {@code true} if this outcome is for the given actor and operation, + * {@code false} it is {@code null} or not for the actor/operation + */ + public static boolean isFor(OperationOutcome outcome, String actor, String operation) { + return (outcome != null && outcome.isFor(actor, operation)); + } + + /** + * Sets the result. + * + * @param result new result + */ + public void setResult(@NonNull PolicyResult result) { + this.result = result; + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java index e308ee42e..c09460e34 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import org.onap.policy.common.capabilities.Configurable; import org.onap.policy.common.capabilities.Startable; -import org.onap.policy.controlloop.ControlLoopOperation; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; /** @@ -54,5 +53,5 @@ public interface Operator extends Startable, Configurable<Map<String, Object>> { * @param params parameters needed to start the operation * @return a future that can be used to cancel or await the result of the operation */ - CompletableFuture<ControlLoopOperation> startOperation(ControlLoopOperationParams params); + CompletableFuture<OperationOutcome> startOperation(ControlLoopOperationParams params); } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java index 0aba1a7fa..c3ddd17f3 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java @@ -23,6 +23,9 @@ package org.onap.policy.controlloop.actorserviceprovider; import java.util.Arrays; import java.util.LinkedHashMap; import java.util.Map; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; import org.onap.policy.common.utils.coder.Coder; import org.onap.policy.common.utils.coder.CoderException; import org.onap.policy.common.utils.coder.StandardCoder; @@ -53,6 +56,82 @@ public class Util { } /** + * Logs a REST request. If the request is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param url request URL + * @param request request to be logged + */ + public static <T> void logRestRequest(String url, T request) { + logRestRequest(new StandardCoder(), url, request); + } + + /** + * Logs a REST request. If the request is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param coder coder to be used to pretty-print the request + * @param url request URL + * @param request request to be logged + */ + protected static <T> void logRestRequest(Coder coder, String url, T request) { + String json; + try { + if (request instanceof String) { + json = request.toString(); + } else { + json = coder.encode(request, true); + } + + } catch (CoderException e) { + logger.warn("cannot pretty-print request", e); + json = request.toString(); + } + + NetLoggerUtil.log(EventType.OUT, CommInfrastructure.REST, url, json); + logger.info("[OUT|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json); + } + + /** + * Logs a REST response. If the response is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param url request URL + * @param response response to be logged + */ + public static <T> void logRestResponse(String url, T response) { + logRestResponse(new StandardCoder(), url, response); + } + + /** + * Logs a REST response. If the request is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param coder coder to be used to pretty-print the response + * @param url request URL + * @param response response to be logged + */ + protected static <T> void logRestResponse(Coder coder, String url, T response) { + String json; + try { + if (response == null) { + json = null; + } else if (response instanceof String) { + json = response.toString(); + } else { + json = coder.encode(response, true); + } + + } catch (CoderException e) { + logger.warn("cannot pretty-print response", e); + json = response.toString(); + } + + NetLoggerUtil.log(EventType.IN, CommInfrastructure.REST, url, json); + logger.info("[IN|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json); + } + + /** * Runs a function and logs a message if it throws an exception. Does <i>not</i> * re-throw the exception. * @@ -60,7 +139,7 @@ public class Util { * @param exceptionMessage message to log if an exception is thrown * @param exceptionArgs arguments to be passed to the logger */ - public static void logException(Runnable function, String exceptionMessage, Object... exceptionArgs) { + public static void runFunction(Runnable function, String exceptionMessage, Object... exceptionArgs) { try { function.run(); diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java index 68bbe7edc..cd4d2570f 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java @@ -22,11 +22,12 @@ package org.onap.policy.controlloop.actorserviceprovider.controlloop; import java.io.Serializable; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import lombok.AccessLevel; import lombok.Getter; +import lombok.NonNull; import lombok.Setter; -import org.onap.policy.aai.AaiCqResponse; import org.onap.policy.controlloop.VirtualControlLoopEvent; /** @@ -37,23 +38,35 @@ import org.onap.policy.controlloop.VirtualControlLoopEvent; public class ControlLoopEventContext implements Serializable { private static final long serialVersionUID = 1L; - @Setter(AccessLevel.NONE) + private final VirtualControlLoopEvent event; - private AaiCqResponse aaiCqResponse; + /** + * Enrichment data extracted from the event. Never {@code null}, though it may be + * immutable. + */ + private final Map<String, String> enrichment; - // TODO may remove this if it proves not to be needed @Getter(AccessLevel.NONE) @Setter(AccessLevel.NONE) private Map<String, Serializable> properties = new ConcurrentHashMap<>(); /** + * Request ID extracted from the event, or a generated value if the event has no + * request id; never {@code null}. + */ + private final UUID requestId; + + + /** * Constructs the object. * * @param event event with which this is associated */ - public ControlLoopEventContext(VirtualControlLoopEvent event) { + public ControlLoopEventContext(@NonNull VirtualControlLoopEvent event) { this.event = event; + this.requestId = (event.getRequestId() != null ? event.getRequestId() : UUID.randomUUID()); + this.enrichment = (event.getAai() != null ? event.getAai() : Map.of()); } /** diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java index 9b9aa914e..d7f322e8a 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java @@ -20,14 +20,12 @@ package org.onap.policy.controlloop.actorserviceprovider.impl; -import com.google.common.collect.ImmutableMap; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import org.onap.policy.common.parameters.BeanValidationResult; import org.onap.policy.controlloop.actorserviceprovider.Operator; @@ -46,44 +44,42 @@ public class ActorImpl extends StartConfigPartial<Map<String, Object>> implement /** * Maps a name to an operator. */ - private Map<String, Operator> name2operator; + private final Map<String, Operator> name2operator = new ConcurrentHashMap<>(); /** * Constructs the object. * * @param name actor name - * @param operators the operations supported by this actor */ - public ActorImpl(String name, Operator... operators) { + public ActorImpl(String name) { super(name); - setOperators(Arrays.asList(operators)); } /** - * Sets the operators supported by this actor, overriding any previous list. + * Adds an operator supported by this actor. * - * @param operators the operations supported by this actor + * @param operator operation to be added */ - protected void setOperators(List<Operator> operators) { + protected synchronized void addOperator(Operator operator) { + /* + * This method is "synchronized" to prevent the state from changing while the + * operator is added. The map, itself, does not need synchronization as it's a + * concurrent map. + */ + if (isConfigured()) { throw new IllegalStateException("attempt to set operators on a configured actor: " + getName()); } - Map<String, Operator> map = new HashMap<>(); - for (Operator newOp : operators) { - map.compute(newOp.getName(), (opName, existingOp) -> { - if (existingOp == null) { - return newOp; - } - - // TODO: should this throw an exception? - logger.warn("duplicate names for actor operation {}.{}: {}, ignoring {}", getName(), opName, - existingOp.getClass().getSimpleName(), newOp.getClass().getSimpleName()); - return existingOp; - }); - } + name2operator.compute(operator.getName(), (opName, existingOp) -> { + if (existingOp == null) { + return operator; + } - this.name2operator = ImmutableMap.copyOf(map); + logger.warn("duplicate names for actor operation {}.{}: {}, ignoring {}", getName(), opName, + existingOp.getClass().getSimpleName(), operator.getClass().getSimpleName()); + return existingOp; + }); } @Override @@ -177,7 +173,7 @@ public class ActorImpl extends StartConfigPartial<Map<String, Object>> implement for (Operator oper : name2operator.values()) { if (oper.isConfigured()) { - Util.logException(oper::start, "failed to start operation {}.{}", actorName, oper.getName()); + Util.runFunction(oper::start, "failed to start operation {}.{}", actorName, oper.getName()); } else { logger.warn("not starting unconfigured operation {}.{}", actorName, oper.getName()); @@ -195,7 +191,7 @@ public class ActorImpl extends StartConfigPartial<Map<String, Object>> implement // @formatter:off name2operator.values().forEach( - oper -> Util.logException(oper::stop, "failed to stop operation {}.{}", actorName, oper.getName())); + oper -> Util.runFunction(oper::stop, "failed to stop operation {}.{}", actorName, oper.getName())); // @formatter:on } @@ -208,7 +204,7 @@ public class ActorImpl extends StartConfigPartial<Map<String, Object>> implement logger.info("shutting down operations for actor {}", actorName); // @formatter:off - name2operator.values().forEach(oper -> Util.logException(oper::shutdown, + name2operator.values().forEach(oper -> Util.runFunction(oper::shutdown, "failed to shutdown operation {}.{}", actorName, oper.getName())); // @formatter:on } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActor.java new file mode 100644 index 000000000..28b7b3924 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActor.java @@ -0,0 +1,58 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.util.Map; +import java.util.function.Function; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpActorParams; + +/** + * Actor that uses HTTP, where the only additional property that an operator needs is a + * URL. The actor's parameters must be an {@link HttpActorParams} and its operator + * parameters are expected to be an {@link HttpParams}. + */ +public class HttpActor extends ActorImpl { + + /** + * Constructs the object. + * + * @param name actor's name + */ + public HttpActor(String name) { + super(name); + } + + /** + * Translates the parameters to an {@link HttpActorParams} and then creates a function + * that will extract operator-specific parameters. + */ + @Override + protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) { + String actorName = getName(); + + // @formatter:off + return Util.translate(actorName, actorParameters, HttpActorParams.class) + .doValidation(actorName) + .makeOperationParameters(actorName); + // @formatter:on + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java new file mode 100644 index 000000000..566492907 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java @@ -0,0 +1,84 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.util.Map; +import lombok.AccessLevel; +import lombok.Getter; +import org.onap.policy.common.endpoints.http.client.HttpClient; +import org.onap.policy.common.endpoints.http.client.HttpClientFactory; +import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; + +/** + * Operator that uses HTTP. The operator's parameters must be a {@link HttpParams}. + */ +public class HttpOperator extends OperatorPartial { + + @Getter(AccessLevel.PROTECTED) + private HttpClient client; + + @Getter + private long timeoutSec; + + /** + * URI path for this particular operation. + */ + @Getter + private String path; + + + /** + * Constructs the object. + * + * @param actorName name of the actor with which this operator is associated + * @param name operation name + */ + public HttpOperator(String actorName, String name) { + super(actorName, name); + } + + /** + * Translates the parameters to an {@link HttpParams} and then extracts the relevant + * values. + */ + @Override + protected void doConfigure(Map<String, Object> parameters) { + HttpParams params = Util.translate(getFullName(), parameters, HttpParams.class); + ValidationResult result = params.validate(getFullName()); + if (!result.isValid()) { + throw new ParameterValidationRuntimeException("invalid parameters", result); + } + + client = getClientFactory().get(params.getClientName()); + path = params.getPath(); + timeoutSec = params.getTimeoutSec(); + } + + // these may be overridden by junits + + protected HttpClientFactory getClientFactory() { + return HttpClientFactoryInstance.getClientFactory(); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java index 80d8fbd04..df5258d71 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java @@ -20,37 +20,61 @@ package org.onap.policy.controlloop.actorserviceprovider.impl; -import java.time.Instant; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.Function; +import lombok.AccessLevel; import lombok.Getter; +import lombok.Setter; import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.actorserviceprovider.CallbackManager; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; import org.onap.policy.controlloop.actorserviceprovider.Operator; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; -import org.onap.policy.controlloop.policy.Policy; import org.onap.policy.controlloop.policy.PolicyResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Partial implementation of an operator. Subclasses can choose to simply implement - * {@link #doOperation(ControlLoopOperationParams)}, or they may choose to override - * {@link #doOperationAsFuture(ControlLoopOperationParams)}. + * Partial implementation of an operator. In general, it's preferable that subclasses + * would override + * {@link #startOperationAsync(ControlLoopOperationParams, int, OperationOutcome) + * startOperationAsync()}. However, if that proves to be too difficult, then they can + * simply override {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome) + * doOperation()}. In addition, if the operation requires any preprocessor steps, the + * subclass may choose to override + * {@link #startPreprocessorAsync(ControlLoopOperationParams) startPreprocessorAsync()}. + * <p/> + * The futures returned by the methods within this class can be canceled, and will + * propagate the cancellation to any subtasks. Thus it is also expected that any futures + * returned by overridden methods will do the same. Of course, if a class overrides + * {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome) doOperation()}, + * then there's little that can be done to cancel that particular operation. */ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Object>> implements Operator { private static final Logger logger = LoggerFactory.getLogger(OperatorPartial.class); - private static final String OUTCOME_SUCCESS = PolicyResult.SUCCESS.toString(); - private static final String OUTCOME_FAILURE = PolicyResult.FAILURE.toString(); - private static final String OUTCOME_RETRIES = PolicyResult.FAILURE_RETRIES.toString(); + /** + * Executor to be used for tasks that may perform blocking I/O. The default executor + * simply launches a new thread for each command that is submitted to it. + * <p/> + * May be overridden by junit tests. + */ + @Getter(AccessLevel.PROTECTED) + @Setter(AccessLevel.PROTECTED) + private Executor blockingExecutor = command -> { + Thread thread = new Thread(command); + thread.setDaemon(true); + thread.start(); + }; @Getter private final String actorName; @@ -103,94 +127,52 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj } @Override - public final CompletableFuture<ControlLoopOperation> startOperation(ControlLoopOperationParams params) { + public final CompletableFuture<OperationOutcome> startOperation(ControlLoopOperationParams params) { if (!isAlive()) { throw new IllegalStateException("operation is not running: " + getFullName()); } - final Executor executor = params.getExecutor(); - // allocate a controller for the entire operation - final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>(); + final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - CompletableFuture<ControlLoopOperation> preproc = startPreprocessor(params); + CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync(params); if (preproc == null) { // no preprocessor required - just start the operation return startOperationAttempt(params, controller, 1); } - // propagate "stop" to the preprocessor - controller.add(preproc); - /* * Do preprocessor first and then, if successful, start the operation. Note: * operations create their own outcome, ignoring the outcome from any previous * steps. + * + * Wrap the preprocessor to ensure "stop" is propagated to it. */ - preproc.whenCompleteAsync(controller.delayedRemove(preproc), executor) - .thenComposeAsync(handleFailure(params, controller), executor) - .thenComposeAsync(onSuccess(params, unused -> startOperationAttempt(params, controller, 1)), - executor); - - return controller; - } - - /** - * Starts an operation's preprocessor step(s). If the preprocessor fails, then it - * invokes the started and completed call-backs. - * - * @param params operation parameters - * @return a future that will return the preprocessor outcome, or {@code null} if this - * operation needs no preprocessor - */ - protected CompletableFuture<ControlLoopOperation> startPreprocessor(ControlLoopOperationParams params) { - logger.info("{}: start low-level operation preprocessor for {}", getFullName(), params.getRequestId()); - - final Executor executor = params.getExecutor(); - final ControlLoopOperation operation = params.makeOutcome(); - - final Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preproc = - doPreprocessorAsFuture(params); - if (preproc == null) { - // no preprocessor required - return null; - } - - // allocate a controller for the preprocessor steps - final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>(); - - /* - * Don't mark it complete until we've built the whole pipeline. This will prevent - * the operation from starting until after it has been successfully built (i.e., - * without generating any exceptions). - */ - final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>(); - // @formatter:off - firstFuture - .thenComposeAsync(controller.add(preproc), executor) - .exceptionally(fromException(params, operation)) - .whenCompleteAsync(controller.delayedComplete(), executor); + controller.wrap(preproc) + .exceptionally(fromException(params, "preprocessor of operation")) + .thenCompose(handlePreprocessorFailure(params, controller)) + .thenCompose(unusedOutcome -> startOperationAttempt(params, controller, 1)); // @formatter:on - // start the pipeline - firstFuture.complete(operation); - return controller; } /** * Handles a failure in the preprocessor pipeline. If a failure occurred, then it - * invokes the call-backs and returns a failed outcome. Otherwise, it returns the - * outcome that it received. + * invokes the call-backs, marks the controller complete, and returns an incomplete + * future, effectively halting the pipeline. Otherwise, it returns the outcome that it + * received. + * <p/> + * Assumes that no callbacks have been invoked yet. * * @param params operation parameters * @param controller pipeline controller * @return a function that checks the outcome status and continues, if successful, or * indicates a failure otherwise */ - private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> handleFailure( - ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller) { + private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure( + ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller) { return outcome -> { @@ -207,19 +189,21 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj // propagate "stop" to the callbacks controller.add(callbacks); - final ControlLoopOperation outcome2 = params.makeOutcome(); + final OperationOutcome outcome2 = params.makeOutcome(); // TODO need a FAILURE_MISSING_DATA (e.g., A&AI) - outcome2.setOutcome(PolicyResult.FAILURE_GUARD.toString()); + outcome2.setResult(PolicyResult.FAILURE_GUARD); outcome2.setMessage(outcome != null ? outcome.getMessage() : null); - CompletableFuture.completedFuture(outcome2).thenApplyAsync(callbackStarted(params, callbacks), executor) - .thenApplyAsync(callbackCompleted(params, callbacks), executor) - .whenCompleteAsync(controller.delayedRemove(callbacks), executor) + // @formatter:off + CompletableFuture.completedFuture(outcome2) + .whenCompleteAsync(callbackStarted(params, callbacks), executor) + .whenCompleteAsync(callbackCompleted(params, callbacks), executor) .whenCompleteAsync(controller.delayedComplete(), executor); + // @formatter:on - return controller; + return new CompletableFuture<>(); }; } @@ -237,8 +221,7 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @return a function that will start the preprocessor and returns its outcome, or * {@code null} if this operation needs no preprocessor */ - protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture( - ControlLoopOperationParams params) { + protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) { return null; } @@ -251,20 +234,12 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param attempt attempt number, typically starting with 1 * @return a future that will return the final result of all attempts */ - private CompletableFuture<ControlLoopOperation> startOperationAttempt(ControlLoopOperationParams params, - PipelineControllerFuture<ControlLoopOperation> controller, int attempt) { - - final Executor executor = params.getExecutor(); - - CompletableFuture<ControlLoopOperation> future = startAttemptWithoutRetries(params, attempt); + private CompletableFuture<OperationOutcome> startOperationAttempt(ControlLoopOperationParams params, + PipelineControllerFuture<OperationOutcome> controller, int attempt) { // propagate "stop" to the operation attempt - controller.add(future); - - // detach when complete - future.whenCompleteAsync(controller.delayedRemove(future), executor) - .thenComposeAsync(retryOnFailure(params, controller, attempt), params.getExecutor()) - .whenCompleteAsync(controller.delayedComplete(), executor); + controller.wrap(startAttemptWithoutRetries(params, attempt)) + .thenCompose(retryOnFailure(params, controller, attempt)); return controller; } @@ -276,40 +251,32 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param attempt attempt number, typically starting with 1 * @return a future that will return the result of a single operation attempt */ - private CompletableFuture<ControlLoopOperation> startAttemptWithoutRetries(ControlLoopOperationParams params, + private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(ControlLoopOperationParams params, int attempt) { logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId()); final Executor executor = params.getExecutor(); - final ControlLoopOperation outcome = params.makeOutcome(); + final OperationOutcome outcome = params.makeOutcome(); final CallbackManager callbacks = new CallbackManager(); // this operation attempt gets its own controller - final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>(); + final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); // propagate "stop" to the callbacks controller.add(callbacks); - /* - * Don't mark it complete until we've built the whole pipeline. This will prevent - * the operation from starting until after it has been successfully built (i.e., - * without generating any exceptions). - */ - final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>(); - // @formatter:off - CompletableFuture<ControlLoopOperation> future2 = - firstFuture.thenComposeAsync(verifyRunning(controller, params), executor) - .thenApplyAsync(callbackStarted(params, callbacks), executor) - .thenComposeAsync(controller.add(doOperationAsFuture(params, attempt)), executor); + CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome) + .whenCompleteAsync(callbackStarted(params, callbacks), executor) + .thenCompose(controller.wrap(outcome2 -> startOperationAsync(params, attempt, outcome2))); // @formatter:on // handle timeouts, if specified - long timeoutMillis = getTimeOutMillis(params.getPolicy()); + long timeoutMillis = getTimeOutMillis(params.getTimeoutSec()); if (timeoutMillis > 0) { logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId()); - future2 = future2.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS); + future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS); } /* @@ -321,16 +288,13 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj */ // @formatter:off - future2.exceptionally(fromException(params, outcome)) - .thenApplyAsync(setRetryFlag(params, attempt), executor) - .thenApplyAsync(callbackStarted(params, callbacks), executor) - .thenApplyAsync(callbackCompleted(params, callbacks), executor) + future.exceptionally(fromException(params, "operation")) + .thenApply(setRetryFlag(params, attempt)) + .whenCompleteAsync(callbackStarted(params, callbacks), executor) + .whenCompleteAsync(callbackCompleted(params, callbacks), executor) .whenCompleteAsync(controller.delayedComplete(), executor); // @formatter:on - // start the pipeline - firstFuture.complete(outcome); - return controller; } @@ -340,8 +304,8 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param outcome outcome to examine * @return {@code true} if the outcome was successful */ - protected boolean isSuccess(ControlLoopOperation outcome) { - return OUTCOME_SUCCESS.equals(outcome.getOutcome()); + protected boolean isSuccess(OperationOutcome outcome) { + return (outcome.getResult() == PolicyResult.SUCCESS); } /** @@ -351,13 +315,29 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @return {@code true} if the outcome is not {@code null} and was a failure * <i>and</i> was associated with this operator, {@code false} otherwise */ - protected boolean isActorFailed(ControlLoopOperation outcome) { - return OUTCOME_FAILURE.equals(getActorOutcome(outcome)); + protected boolean isActorFailed(OperationOutcome outcome) { + return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE); + } + + /** + * Determines if the given outcome is for this operation. + * + * @param outcome outcome to examine + * @return {@code true} if the outcome is for this operation, {@code false} otherwise + */ + protected boolean isSameOperation(OperationOutcome outcome) { + return OperationOutcome.isFor(outcome, getActorName(), getName()); } /** * Invokes the operation as a "future". This method simply invokes - * {@link #doOperation(ControlLoopOperationParams)} turning it into a "future". + * {@link #doOperation(ControlLoopOperationParams)} using the {@link #blockingExecutor + * "blocking executor"}, returning the result via a "future". + * <p/> + * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using + * the executor in the "params", as that may bring the background thread pool to a + * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used + * instead. * <p/> * This method assumes the following: * <ul> @@ -373,31 +353,23 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @return a function that will start the operation and return its result when * complete */ - protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture( - ControlLoopOperationParams params, int attempt) { - - /* - * TODO As doOperation() may perform blocking I/O, this should be launched in its - * own thread to prevent the ForkJoinPool from being tied up. Should probably - * provide a method to make that easy. - */ + protected CompletableFuture<OperationOutcome> startOperationAsync(ControlLoopOperationParams params, int attempt, + OperationOutcome outcome) { - return operation -> CompletableFuture.supplyAsync(() -> doOperation(params, attempt, operation), - params.getExecutor()); + return CompletableFuture.supplyAsync(() -> doOperation(params, attempt, outcome), getBlockingExecutor()); } /** * Low-level method that performs the operation. This can make the same assumptions * that are made by {@link #doOperationAsFuture(ControlLoopOperationParams)}. This - * method throws an {@link UnsupportedOperationException}. + * particular method simply throws an {@link UnsupportedOperationException}. * * @param params operation parameters * @param attempt attempt number, typically starting with 1 * @param operation the operation being performed * @return the outcome of the operation */ - protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt, - ControlLoopOperation operation) { + protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt, OperationOutcome operation) { throw new UnsupportedOperationException("start operation " + getFullName()); } @@ -411,8 +383,7 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param attempt latest attempt number, starting with 1 * @return a function to get the next future to execute */ - private Function<ControlLoopOperation, ControlLoopOperation> setRetryFlag(ControlLoopOperationParams params, - int attempt) { + private Function<OperationOutcome, OperationOutcome> setRetryFlag(ControlLoopOperationParams params, int attempt) { return operation -> { if (operation != null && !isActorFailed(operation)) { @@ -424,22 +395,22 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj } // get a non-null operation - ControlLoopOperation oper2; + OperationOutcome oper2; if (operation != null) { oper2 = operation; } else { oper2 = params.makeOutcome(); - oper2.setOutcome(OUTCOME_FAILURE); + oper2.setResult(PolicyResult.FAILURE); } - if (params.getPolicy().getRetry() != null && params.getPolicy().getRetry() > 0 - && attempt > params.getPolicy().getRetry()) { + Integer retry = params.getRetry(); + if (retry != null && retry > 0 && attempt > retry) { /* * retries were specified and we've already tried them all - change to * FAILURE_RETRIES */ logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId()); - oper2.setOutcome(OUTCOME_RETRIES); + oper2.setResult(PolicyResult.FAILURE_RETRIES); } return oper2; @@ -456,21 +427,24 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param attempt latest attempt number, starting with 1 * @return a function to get the next future to execute */ - private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> retryOnFailure( - ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller, + private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure( + ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller, int attempt) { return operation -> { if (!isActorFailed(operation)) { // wrong type or wrong operation - just leave it as is logger.trace("not retrying operation {} for {}", getFullName(), params.getRequestId()); - return CompletableFuture.completedFuture(operation); + controller.complete(operation); + return new CompletableFuture<>(); } - if (params.getPolicy().getRetry() == null || params.getPolicy().getRetry() <= 0) { + Integer retry = params.getRetry(); + if (retry == null || retry <= 0) { // no retries - already marked as FAILURE, so just return it logger.info("operation {} no retries for {}", getFullName(), params.getRequestId()); - return CompletableFuture.completedFuture(operation); + controller.complete(operation); + return new CompletableFuture<>(); } @@ -484,100 +458,279 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj } /** - * Gets the outcome of an operation for this operation. + * Converts an exception into an operation outcome, returning a copy of the outcome to + * prevent background jobs from changing it. * - * @param operation operation whose outcome is to be extracted - * @return the outcome of the given operation, if it's for this operator, {@code null} - * otherwise + * @param params operation parameters + * @param type type of item throwing the exception + * @return a function that will convert an exception into an operation outcome */ - protected String getActorOutcome(ControlLoopOperation operation) { - if (operation == null) { - return null; - } + private Function<Throwable, OperationOutcome> fromException(ControlLoopOperationParams params, String type) { - if (!getActorName().equals(operation.getActor())) { - return null; - } + return thrown -> { + OperationOutcome outcome = params.makeOutcome(); + + logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(), + params.getRequestId(), thrown); + + return setOutcome(params, outcome, thrown); + }; + } + + /** + * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels + * any outstanding futures when one completes. + * + * @param params operation parameters + * @param futures futures for which to wait + * @return a future to cancel or await an outcome. If this future is canceled, then + * all of the futures will be canceled + */ + protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params, + List<CompletableFuture<OperationOutcome>> futures) { + + // convert list to an array + @SuppressWarnings("rawtypes") + CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]); + + @SuppressWarnings("unchecked") + CompletableFuture<OperationOutcome> result = anyOf(params, arrFutures); + return result; + } + + /** + * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any + * outstanding futures when one completes. + * + * @param params operation parameters + * @param futures futures for which to wait + * @return a future to cancel or await an outcome. If this future is canceled, then + * all of the futures will be canceled + */ + protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params, + @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) { + + final Executor executor = params.getExecutor(); + final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + attachFutures(controller, futures); + + // @formatter:off + CompletableFuture.anyOf(futures) + .thenApply(object -> (OperationOutcome) object) + .whenCompleteAsync(controller.delayedComplete(), executor); + // @formatter:on + + return controller; + } + + /** + * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels + * the futures if returned future is canceled. The future returns the "worst" outcome, + * based on priority (see {@link #detmPriority(OperationOutcome)}). + * + * @param params operation parameters + * @param futures futures for which to wait + * @return a future to cancel or await an outcome. If this future is canceled, then + * all of the futures will be canceled + */ + protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params, + List<CompletableFuture<OperationOutcome>> futures) { - if (!getName().equals(operation.getOperation())) { - return null; + // convert list to an array + @SuppressWarnings("rawtypes") + CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]); + + @SuppressWarnings("unchecked") + CompletableFuture<OperationOutcome> result = allOf(params, arrFutures); + return result; + } + + /** + * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the + * futures if returned future is canceled. The future returns the "worst" outcome, + * based on priority (see {@link #detmPriority(OperationOutcome)}). + * + * @param params operation parameters + * @param futures futures for which to wait + * @return a future to cancel or await an outcome. If this future is canceled, then + * all of the futures will be canceled + */ + protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params, + @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) { + + final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + attachFutures(controller, futures); + + OperationOutcome[] outcomes = new OperationOutcome[futures.length]; + + @SuppressWarnings("rawtypes") + CompletableFuture[] futures2 = new CompletableFuture[futures.length]; + + // record the outcomes of each future when it completes + for (int count = 0; count < futures2.length; ++count) { + final int count2 = count; + futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2); } - return operation.getOutcome(); + CompletableFuture.allOf(futures2).whenComplete(combineOutcomes(params, controller, outcomes)); + + return controller; } /** - * Gets a function that will start the next step, if the current operation was - * successful, or just return the current operation, otherwise. + * Attaches the given futures to the controller. + * + * @param controller master controller for all of the futures + * @param futures futures to be attached to the controller + */ + private void attachFutures(PipelineControllerFuture<OperationOutcome> controller, + @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) { + + // attach each task + for (CompletableFuture<OperationOutcome> future : futures) { + controller.add(future); + } + } + + /** + * Combines the outcomes from a set of tasks. * * @param params operation parameters - * @param nextStep function that will invoke the next step, passing it the operation - * @return a function that will start the next step + * @param future future to be completed with the combined result + * @param outcomes outcomes to be examined */ - protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> onSuccess( - ControlLoopOperationParams params, - Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> nextStep) { + private BiConsumer<Void, Throwable> combineOutcomes(ControlLoopOperationParams params, + CompletableFuture<OperationOutcome> future, OperationOutcome[] outcomes) { - return operation -> { + return (unused, thrown) -> { + if (thrown != null) { + future.completeExceptionally(thrown); + return; + } - if (operation == null) { - logger.trace("{}: null outcome - discarding next task for {}", getFullName(), params.getRequestId()); - ControlLoopOperation outcome = params.makeOutcome(); - outcome.setOutcome(OUTCOME_FAILURE); - return CompletableFuture.completedFuture(outcome); + // identify the outcome with the highest priority + OperationOutcome outcome = outcomes[0]; + int priority = detmPriority(outcome); - } else if (isSuccess(operation)) { - logger.trace("{}: success - starting next task for {}", getFullName(), params.getRequestId()); - return nextStep.apply(operation); + // start with "1", as we've already dealt with "0" + for (int count = 1; count < outcomes.length; ++count) { + OperationOutcome outcome2 = outcomes[count]; + int priority2 = detmPriority(outcome2); - } else { - logger.trace("{}: failure - discarding next task for {}", getFullName(), params.getRequestId()); - return CompletableFuture.completedFuture(operation); + if (priority2 > priority) { + outcome = outcome2; + priority = priority2; + } } + + logger.trace("{}: combined outcome of tasks is {} for {}", getFullName(), + (outcome == null ? null : outcome.getResult()), params.getRequestId()); + + future.complete(outcome); }; } /** - * Converts an exception into an operation outcome, returning a copy of the outcome to - * prevent background jobs from changing it. + * Determines the priority of an outcome based on its result. * - * @param params operation parameters - * @param operation current operation - * @return a function that will convert an exception into an operation outcome + * @param outcome outcome to examine, or {@code null} + * @return the outcome's priority */ - private Function<Throwable, ControlLoopOperation> fromException(ControlLoopOperationParams params, - ControlLoopOperation operation) { + protected int detmPriority(OperationOutcome outcome) { + if (outcome == null) { + return 1; + } - return thrown -> { - logger.warn("exception throw by operation {}.{} for {}", operation.getActor(), operation.getOperation(), - params.getRequestId(), thrown); + switch (outcome.getResult()) { + case SUCCESS: + return 0; + + case FAILURE_GUARD: + return 2; + + case FAILURE_RETRIES: + return 3; + + case FAILURE: + return 4; + + case FAILURE_TIMEOUT: + return 5; + + case FAILURE_EXCEPTION: + default: + return 6; + } + } + + /** + * Performs a task, after verifying that the controller is still running. Also checks + * that the previous outcome was successful, if specified. + * + * @param params operation parameters + * @param controller overall pipeline controller + * @param checkSuccess {@code true} to check the previous outcome, {@code false} + * otherwise + * @param outcome outcome of the previous task + * @param tasks tasks to be performed + * @return a function to perform the task. If everything checks out, then it returns + * the task's future. Otherwise, it returns an incomplete future and completes + * the controller instead. + */ + // @formatter:off + protected CompletableFuture<OperationOutcome> doTask(ControlLoopOperationParams params, + PipelineControllerFuture<OperationOutcome> controller, + boolean checkSuccess, OperationOutcome outcome, + CompletableFuture<OperationOutcome> task) { + // @formatter:on + if (checkSuccess && !isSuccess(outcome)) { /* - * Must make a copy of the operation, as the original could be changed by - * background jobs that might still be running. + * must complete before canceling so that cancel() doesn't cause controller to + * complete */ - return setOutcome(params, new ControlLoopOperation(operation), thrown); - }; + controller.complete(outcome); + task.cancel(false); + return new CompletableFuture<>(); + } + + return controller.wrap(task); } /** - * Gets a function to verify that the operation is still running. If the pipeline is - * not running, then it returns an incomplete future, which will effectively halt - * subsequent operations in the pipeline. This method is intended to be used with one - * of the {@link CompletableFuture}'s <i>thenCompose()</i> methods. + * Performs a task, after verifying that the controller is still running. Also checks + * that the previous outcome was successful, if specified. * - * @param controller pipeline controller * @param params operation parameters - * @return a function to verify that the operation is still running + * @param controller overall pipeline controller + * @param checkSuccess {@code true} to check the previous outcome, {@code false} + * otherwise + * @param tasks tasks to be performed + * @return a function to perform the task. If everything checks out, then it returns + * the task's future. Otherwise, it returns an incomplete future and completes + * the controller instead. */ - protected <T> Function<T, CompletableFuture<T>> verifyRunning( - PipelineControllerFuture<ControlLoopOperation> controller, ControlLoopOperationParams params) { + // @formatter:off + protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask(ControlLoopOperationParams params, + PipelineControllerFuture<OperationOutcome> controller, + boolean checkSuccess, + Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) { + // @formatter:on + + return outcome -> { + + if (!controller.isRunning()) { + return new CompletableFuture<>(); + } - return value -> { - boolean running = controller.isRunning(); - logger.trace("{}: verify running {} for {}", getFullName(), running, params.getRequestId()); + if (checkSuccess && !isSuccess(outcome)) { + controller.complete(outcome); + return new CompletableFuture<>(); + } - return (running ? CompletableFuture.completedFuture(value) : new CompletableFuture<>()); + return controller.wrap(task.apply(outcome)); }; } @@ -591,10 +744,10 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param callbacks used to determine if the start callback can be invoked * @return a function that sets the start time and invokes the callback */ - private Function<ControlLoopOperation, ControlLoopOperation> callbackStarted(ControlLoopOperationParams params, + private BiConsumer<OperationOutcome, Throwable> callbackStarted(ControlLoopOperationParams params, CallbackManager callbacks) { - return outcome -> { + return (outcome, thrown) -> { if (callbacks.canStart()) { // haven't invoked "start" callback yet @@ -602,8 +755,6 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj outcome.setEnd(null); params.callbackStarted(outcome); } - - return outcome; }; } @@ -621,18 +772,16 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param callbacks used to determine if the end callback can be invoked * @return a function that sets the end time and invokes the callback */ - private Function<ControlLoopOperation, ControlLoopOperation> callbackCompleted(ControlLoopOperationParams params, + private BiConsumer<OperationOutcome, Throwable> callbackCompleted(ControlLoopOperationParams params, CallbackManager callbacks) { - return operation -> { + return (outcome, thrown) -> { if (callbacks.canEnd()) { - operation.setStart(callbacks.getStartTime()); - operation.setEnd(callbacks.getEndTime()); - params.callbackCompleted(operation); + outcome.setStart(callbacks.getStartTime()); + outcome.setEnd(callbacks.getEndTime()); + params.callbackCompleted(outcome); } - - return operation; }; } @@ -643,7 +792,7 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param operation operation to be updated * @return the updated operation */ - protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation, + protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation, Throwable thrown) { PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION); return setOutcome(params, operation, result); @@ -657,10 +806,10 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param result result of the operation * @return the updated operation */ - protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation, + protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation, PolicyResult result) { logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId()); - operation.setOutcome(result.toString()); + operation.setResult(result); operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG : ControlLoopOperation.FAILED_MSG); @@ -687,71 +836,10 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * Gets the operation timeout. Subclasses may override this method to obtain the * timeout in some other way (e.g., through configuration properties). * - * @param policy policy from which to extract the timeout + * @param timeoutSec timeout, in seconds, or {@code null} * @return the operation timeout, in milliseconds */ - protected long getTimeOutMillis(Policy policy) { - Integer timeoutSec = policy.getTimeout(); + protected long getTimeOutMillis(Integer timeoutSec) { return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS)); } - - /** - * Manager for "start" and "end" callbacks. - */ - private static class CallbackManager implements Runnable { - private final AtomicReference<Instant> startTime = new AtomicReference<>(); - private final AtomicReference<Instant> endTime = new AtomicReference<>(); - - /** - * Determines if the "start" callback can be invoked. If so, it sets the - * {@link #startTime} to the current time. - * - * @return {@code true} if the "start" callback can be invoked, {@code false} - * otherwise - */ - public boolean canStart() { - return startTime.compareAndSet(null, Instant.now()); - } - - /** - * Determines if the "end" callback can be invoked. If so, it sets the - * {@link #endTime} to the current time. - * - * @return {@code true} if the "end" callback can be invoked, {@code false} - * otherwise - */ - public boolean canEnd() { - return endTime.compareAndSet(null, Instant.now()); - } - - /** - * Gets the start time. - * - * @return the start time, or {@code null} if {@link #canStart()} has not been - * invoked yet. - */ - public Instant getStartTime() { - return startTime.get(); - } - - /** - * Gets the end time. - * - * @return the end time, or {@code null} if {@link #canEnd()} has not been invoked - * yet. - */ - public Instant getEndTime() { - return endTime.get(); - } - - /** - * Prevents further callbacks from being executed by setting {@link #startTime} - * and {@link #endTime}. - */ - @Override - public void run() { - canStart(); - canEnd(); - } - } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java index 08aba81f2..57fce40d7 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java @@ -20,6 +20,7 @@ package org.onap.policy.controlloop.actorserviceprovider.parameters; +import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -32,11 +33,11 @@ import lombok.Getter; import org.onap.policy.common.parameters.BeanValidationResult; import org.onap.policy.common.parameters.BeanValidator; import org.onap.policy.common.parameters.annotations.NotNull; -import org.onap.policy.controlloop.ControlLoopOperation; import org.onap.policy.controlloop.actorserviceprovider.ActorService; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; import org.onap.policy.controlloop.actorserviceprovider.Util; import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext; -import org.onap.policy.controlloop.policy.Policy; +import org.onap.policy.controlloop.policy.Target; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,36 +50,67 @@ import org.slf4j.LoggerFactory; @AllArgsConstructor @EqualsAndHashCode public class ControlLoopOperationParams { - private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationParams.class); - public static final String UNKNOWN = "-unknown-"; - + /** + * Actor name. + */ + @NotNull + private String actor; /** - * The actor service in which to find the actor/operation. + * Actor service in which to find the actor/operation. */ @NotNull private ActorService actorService; /** - * The event for which the operation applies. + * Event for which the operation applies. */ @NotNull private ControlLoopEventContext context; /** - * The executor to use to run the operation. + * Executor to use to run the operation. */ @NotNull @Builder.Default private Executor executor = ForkJoinPool.commonPool(); /** - * The policy associated with the operation. + * Operation name. + */ + @NotNull + private String operation; + + /** + * Payload data for the request. + */ + private Map<String, String> payload; + + /** + * Number of retries allowed, or {@code null} if no retries. + */ + private Integer retry; + + /** + * The entity's target information. May be {@code null}, depending on the requirement + * of the operation to be invoked. + */ + private Target target; + + /** + * Target entity. */ @NotNull - private Policy policy; + private String targetEntity; + + /** + * Timeout, in seconds, or {@code null} if no timeout. Zero and negative values also + * imply no timeout. + */ + @Builder.Default + private Integer timeoutSec = 300; /** * The function to invoke when the operation starts. This is optional. @@ -87,7 +119,7 @@ public class ControlLoopOperationParams { * may happen if the current operation requires other operations to be performed first * (e.g., A&AI queries, guard checks). */ - private Consumer<ControlLoopOperation> startCallback; + private Consumer<OperationOutcome> startCallback; /** * The function to invoke when the operation completes. This is optional. @@ -96,13 +128,7 @@ public class ControlLoopOperationParams { * may happen if the current operation requires other operations to be performed first * (e.g., A&AI queries, guard checks). */ - private Consumer<ControlLoopOperation> completeCallback; - - /** - * Target entity. - */ - @NotNull - private String target; + private Consumer<OperationOutcome> completeCallback; /** * Starts the specified operation. @@ -110,7 +136,7 @@ public class ControlLoopOperationParams { * @return a future that will return the result of the operation * @throws IllegalArgumentException if the parameters are invalid */ - public CompletableFuture<ControlLoopOperation> start() { + public CompletableFuture<OperationOutcome> start() { BeanValidationResult result = validate(); if (!result.isValid()) { logger.warn("parameter error in operation {}.{} for {}:\n{}", getActor(), getOperation(), getRequestId(), @@ -120,31 +146,13 @@ public class ControlLoopOperationParams { // @formatter:off return actorService - .getActor(policy.getActor()) - .getOperator(policy.getRecipe()) + .getActor(getActor()) + .getOperator(getOperation()) .startOperation(this); // @formatter:on } /** - * Gets the name of the actor from the policy. - * - * @return the actor name, or {@link #UNKNOWN} if no name is available - */ - public String getActor() { - return (policy == null || policy.getActor() == null ? UNKNOWN : policy.getActor()); - } - - /** - * Gets the name of the operation from the policy. - * - * @return the operation name, or {@link #UNKNOWN} if no name is available - */ - public String getOperation() { - return (policy == null || policy.getRecipe() == null ? UNKNOWN : policy.getRecipe()); - } - - /** * Gets the requested ID of the associated event. * * @return the event's request ID, or {@code null} if no request ID is available @@ -158,13 +166,13 @@ public class ControlLoopOperationParams { * * @return a new operation outcome */ - public ControlLoopOperation makeOutcome() { - ControlLoopOperation operation = new ControlLoopOperation(); - operation.setActor(getActor()); - operation.setOperation(getOperation()); - operation.setTarget(target); + public OperationOutcome makeOutcome() { + OperationOutcome outcome = new OperationOutcome(); + outcome.setActor(getActor()); + outcome.setOperation(getOperation()); + outcome.setTarget(targetEntity); - return operation; + return outcome; } /** @@ -173,11 +181,11 @@ public class ControlLoopOperationParams { * * @param operation the operation that is being started */ - public void callbackStarted(ControlLoopOperation operation) { + public void callbackStarted(OperationOutcome operation) { logger.info("started operation {}.{} for {}", operation.getActor(), operation.getOperation(), getRequestId()); if (startCallback != null) { - Util.logException(() -> startCallback.accept(operation), "{}.{}: start-callback threw an exception for {}", + Util.runFunction(() -> startCallback.accept(operation), "{}.{}: start-callback threw an exception for {}", operation.getActor(), operation.getOperation(), getRequestId()); } } @@ -188,12 +196,12 @@ public class ControlLoopOperationParams { * * @param operation the operation that is being started */ - public void callbackCompleted(ControlLoopOperation operation) { + public void callbackCompleted(OperationOutcome operation) { logger.info("completed operation {}.{} outcome={} for {}", operation.getActor(), operation.getOperation(), - operation.getOutcome(), getRequestId()); + operation.getResult(), getRequestId()); if (completeCallback != null) { - Util.logException(() -> completeCallback.accept(operation), + Util.runFunction(() -> completeCallback.accept(operation), "{}.{}: complete-callback threw an exception for {}", operation.getActor(), operation.getOperation(), getRequestId()); } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java index d34a3fb5b..1d64a8710 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java @@ -101,7 +101,7 @@ public class ListenerManager { */ protected void runListener(Runnable listener) { // TODO do this asynchronously? - Util.logException(listener, "pipeline listener {} threw an exception", listener); + Util.runFunction(listener, "pipeline listener {} threw an exception", listener); } /** diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java index 96c8f9e05..92843e28a 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java @@ -23,41 +23,70 @@ package org.onap.policy.controlloop.actorserviceprovider.pipeline; import static org.onap.policy.controlloop.actorserviceprovider.Util.ident; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; import lombok.NoArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Pipeline controller, used by operations within the pipeline to determine if they should - * continue to run. If {@link #cancel(boolean)} is invoked, it automatically stops the - * pipeline. + * continue to run. Whenever this is canceled or completed, it automatically cancels all + * futures and runs all listeners that have been added. */ @NoArgsConstructor public class PipelineControllerFuture<T> extends CompletableFuture<T> { private static final Logger logger = LoggerFactory.getLogger(PipelineControllerFuture.class); + private static final String COMPLETE_EXCEPT_MSG = "{}: complete future with exception"; + private static final String CANCEL_MSG = "{}: cancel future"; + private static final String COMPLETE_MSG = "{}: complete future"; + /** * Tracks items added to this controller via one of the <i>add</i> methods. */ private final FutureManager futures = new FutureManager(); - /** - * Cancels and stops the pipeline, in that order. - */ @Override public boolean cancel(boolean mayInterruptIfRunning) { - try { - logger.trace("{}: cancel future", ident(this)); - return super.cancel(mayInterruptIfRunning); + return doAndStop(() -> super.cancel(mayInterruptIfRunning), CANCEL_MSG, ident(this)); + } - } finally { - futures.stop(); - } + @Override + public boolean complete(T value) { + return doAndStop(() -> super.complete(value), COMPLETE_MSG, ident(this)); + } + + @Override + public boolean completeExceptionally(Throwable ex) { + return doAndStop(() -> super.completeExceptionally(ex), COMPLETE_EXCEPT_MSG, ident(this)); + } + + @Override + public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor) { + return super.completeAsync(() -> doAndStop(supplier, COMPLETE_MSG, ident(this)), executor); + } + + @Override + public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) { + return super.completeAsync(() -> doAndStop(supplier, COMPLETE_MSG, ident(this))); + } + + @Override + public CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit) { + logger.info("{}: set future timeout to {} {}", ident(this), timeout, unit); + return super.completeOnTimeout(value, timeout, unit); + } + + @Override + public <U> PipelineControllerFuture<U> newIncompleteFuture() { + return new PipelineControllerFuture<>(); } /** @@ -67,11 +96,8 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> { * * @return a function that removes the given future */ - public <F> BiConsumer<T, Throwable> delayedRemove(Future<F> future) { - return (value, thrown) -> { - logger.trace("{}: remove future {}", ident(this), ident(future)); - remove(future); - }; + public <F> BiConsumer<F, Throwable> delayedRemove(Future<F> future) { + return (value, thrown) -> remove(future); } /** @@ -81,11 +107,8 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> { * * @return a function that removes the given listener */ - public BiConsumer<T, Throwable> delayedRemove(Runnable listener) { - return (value, thrown) -> { - logger.trace("{}: remove listener {}", ident(this), ident(listener)); - remove(listener); - }; + public <F> BiConsumer<F, Throwable> delayedRemove(Runnable listener) { + return (value, thrown) -> remove(listener); } /** @@ -98,25 +121,43 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> { public BiConsumer<T, Throwable> delayedComplete() { return (value, thrown) -> { if (thrown == null) { - logger.trace("{}: complete and stop future", ident(this)); complete(value); } else { - logger.trace("{}: complete exceptionally and stop future", ident(this)); completeExceptionally(thrown); } - - futures.stop(); }; } /** + * Adds a future to the controller and arranges for it to be removed from the + * controller when it completes, whether or not it throws an exception. If the + * controller has already been stopped, then the future is canceled and a new, + * incomplete future is returned. + * + * @param future future to be wrapped + * @return a new future + */ + public CompletableFuture<T> wrap(CompletableFuture<T> future) { + if (!isRunning()) { + logger.trace("{}: not running, skipping next task {}", ident(this), ident(future)); + future.cancel(false); + return new CompletableFuture<>(); + } + + add(future); + return future.whenComplete(this.delayedRemove(future)); + } + + /** * Adds a function whose return value is to be canceled when this controller is * stopped. Note: if the controller is already stopped, then the function will * <i>not</i> be executed. * - * @param futureMaker function to be invoked in the future + * @param futureMaker function to be invoked to create the future + * @return a function to create the future and arrange for it to be managed by this + * controller */ - public <F> Function<F, CompletableFuture<F>> add(Function<F, CompletableFuture<F>> futureMaker) { + public <F> Function<F, CompletableFuture<F>> wrap(Function<F, CompletableFuture<F>> futureMaker) { return input -> { if (!isRunning()) { @@ -127,7 +168,7 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> { CompletableFuture<F> future = futureMaker.apply(input); add(future); - return future; + return future.whenComplete(delayedRemove(future)); }; } @@ -154,4 +195,26 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> { logger.trace("{}: remove listener {}", ident(this), ident(listener)); futures.remove(listener); } + + /** + * Performs an operation, stops the futures, and returns the value from the operation. + * Logs a message using the given arguments. + * + * + * @param <R> type of value to be returned + * @param supplier operation to perform + * @param message message to be logged + * @param args message arguments to fill "{}" place-holders + * @return the operation's result + */ + private <R> R doAndStop(Supplier<R> supplier, String message, Object... args) { + try { + logger.trace(message, args); + return supplier.get(); + + } finally { + logger.trace("{}: stopping this future", ident(this)); + futures.stop(); + } + } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandlerTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandlerTest.java new file mode 100644 index 000000000..31c6d2077 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandlerTest.java @@ -0,0 +1,172 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.controlloop.VirtualControlLoopEvent; +import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.policy.PolicyResult; + +public class AsyncResponseHandlerTest { + + private static final String ACTOR = "my-actor"; + private static final String OPERATION = "my-operation"; + private static final UUID REQ_ID = UUID.randomUUID(); + private static final String TEXT = "some text"; + + private VirtualControlLoopEvent event; + private ControlLoopEventContext context; + private ControlLoopOperationParams params; + private OperationOutcome outcome; + private MyHandler handler; + + /** + * Initializes all fields, including {@link #handler}. + */ + @Before + public void setUp() { + event = new VirtualControlLoopEvent(); + event.setRequestId(REQ_ID); + + context = new ControlLoopEventContext(event); + params = ControlLoopOperationParams.builder().actor(ACTOR).operation(OPERATION).context(context).build(); + outcome = params.makeOutcome(); + + handler = new MyHandler(params, outcome); + } + + @Test + public void testAsyncResponseHandler_testGetParams_testGetOutcome() { + assertSame(params, handler.getParams()); + assertSame(outcome, handler.getOutcome()); + } + + @Test + public void testHandle() { + CompletableFuture<String> future = new CompletableFuture<>(); + handler.handle(future).complete(outcome); + + assertTrue(future.isCancelled()); + } + + @Test + public void testCompleted() throws Exception { + CompletableFuture<OperationOutcome> result = handler.handle(new CompletableFuture<>()); + handler.completed(TEXT); + assertTrue(result.isDone()); + assertSame(outcome, result.get()); + assertEquals(PolicyResult.FAILURE_RETRIES, outcome.getResult()); + assertEquals(TEXT, outcome.getMessage()); + } + + /** + * Tests completed() when doCompleted() throws an exception. + */ + @Test + public void testCompletedException() throws Exception { + IllegalStateException except = new IllegalStateException(); + + outcome = params.makeOutcome(); + handler = new MyHandler(params, outcome) { + @Override + protected OperationOutcome doComplete(String rawResponse) { + throw except; + } + }; + + CompletableFuture<OperationOutcome> result = handler.handle(new CompletableFuture<>()); + handler.completed(TEXT); + assertTrue(result.isCompletedExceptionally()); + + AtomicReference<Throwable> thrown = new AtomicReference<>(); + result.whenComplete((unused, thrown2) -> thrown.set(thrown2)); + + assertSame(except, thrown.get()); + } + + @Test + public void testFailed() throws Exception { + IllegalStateException except = new IllegalStateException(); + + CompletableFuture<OperationOutcome> result = handler.handle(new CompletableFuture<>()); + handler.failed(except); + + assertTrue(result.isDone()); + assertSame(outcome, result.get()); + assertEquals(PolicyResult.FAILURE_GUARD, outcome.getResult()); + } + + /** + * Tests failed() when doFailed() throws an exception. + */ + @Test + public void testFailedException() throws Exception { + IllegalStateException except = new IllegalStateException(); + + outcome = params.makeOutcome(); + handler = new MyHandler(params, outcome) { + @Override + protected OperationOutcome doFailed(Throwable thrown) { + throw except; + } + }; + + CompletableFuture<OperationOutcome> result = handler.handle(new CompletableFuture<>()); + handler.failed(except); + assertTrue(result.isCompletedExceptionally()); + + AtomicReference<Throwable> thrown = new AtomicReference<>(); + result.whenComplete((unused, thrown2) -> thrown.set(thrown2)); + + assertSame(except, thrown.get()); + } + + private class MyHandler extends AsyncResponseHandler<String> { + + public MyHandler(ControlLoopOperationParams params, OperationOutcome outcome) { + super(params, outcome); + } + + @Override + protected OperationOutcome doComplete(String rawResponse) { + OperationOutcome outcome = getOutcome(); + outcome.setResult(PolicyResult.FAILURE_RETRIES); + outcome.setMessage(rawResponse); + return outcome; + } + + @Override + protected OperationOutcome doFailed(Throwable thrown) { + OperationOutcome outcome = getOutcome(); + outcome.setResult(PolicyResult.FAILURE_GUARD); + return outcome; + } + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManagerTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManagerTest.java new file mode 100644 index 000000000..44606cb14 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManagerTest.java @@ -0,0 +1,89 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.time.Instant; +import org.junit.Before; +import org.junit.Test; + +public class CallbackManagerTest { + + private CallbackManager mgr; + + @Before + public void setUp() { + mgr = new CallbackManager(); + } + + @Test + public void testCanStart_testGetStartTime() { + // null until canXxx() is called + assertNull(mgr.getStartTime()); + + assertTrue(mgr.canStart()); + + Instant time = mgr.getStartTime(); + assertNotNull(time); + assertNull(mgr.getEndTime()); + + // false for now on + assertFalse(mgr.canStart()); + assertFalse(mgr.canStart()); + + assertEquals(time, mgr.getStartTime()); + } + + @Test + public void testCanEnd_testGetEndTime() { + // null until canXxx() is called + assertNull(mgr.getEndTime()); + assertNull(mgr.getEndTime()); + + assertTrue(mgr.canEnd()); + + Instant time = mgr.getEndTime(); + assertNotNull(time); + assertNull(mgr.getStartTime()); + + // false for now on + assertFalse(mgr.canEnd()); + assertFalse(mgr.canEnd()); + + assertEquals(time, mgr.getEndTime()); + } + + @Test + public void testRun() { + mgr.run(); + + assertNotNull(mgr.getStartTime()); + assertNotNull(mgr.getEndTime()); + + assertFalse(mgr.canStart()); + assertFalse(mgr.canEnd()); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcomeTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcomeTest.java new file mode 100644 index 000000000..4e9728336 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcomeTest.java @@ -0,0 +1,137 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.policy.PolicyResult; + +public class OperationOutcomeTest { + private static final String ACTOR = "my-actor"; + private static final String OPERATION = "my-operation"; + private static final String TARGET = "my-target"; + private static final Instant START = Instant.ofEpochMilli(10); + private static final Instant END = Instant.ofEpochMilli(20); + private static final String SUB_REQ_ID = "my-sub-request-id"; + private static final PolicyResult RESULT = PolicyResult.FAILURE_GUARD; + private static final String MESSAGE = "my-message"; + + private OperationOutcome outcome; + + @Before + public void setUp() { + outcome = new OperationOutcome(); + } + + @Test + public void testOperationOutcomeOperationOutcome() { + setAll(); + + OperationOutcome outcome2 = new OperationOutcome(outcome); + + assertEquals(ACTOR, outcome2.getActor()); + assertEquals(OPERATION, outcome2.getOperation()); + assertEquals(TARGET, outcome2.getTarget()); + assertEquals(START, outcome2.getStart()); + assertEquals(END, outcome2.getEnd()); + assertEquals(SUB_REQ_ID, outcome2.getSubRequestId()); + assertEquals(RESULT, outcome2.getResult()); + assertEquals(MESSAGE, outcome2.getMessage()); + } + + @Test + public void testToControlLoopOperation() { + setAll(); + + ControlLoopOperation outcome2 = outcome.toControlLoopOperation(); + + assertEquals(ACTOR, outcome2.getActor()); + assertEquals(OPERATION, outcome2.getOperation()); + assertEquals(TARGET, outcome2.getTarget()); + assertEquals(START, outcome2.getStart()); + assertEquals(END, outcome2.getEnd()); + assertEquals(SUB_REQ_ID, outcome2.getSubRequestId()); + assertEquals(RESULT.toString(), outcome2.getOutcome()); + assertEquals(MESSAGE, outcome2.getMessage()); + } + + /** + * Tests both isFor() methods, as one invokes the other. + */ + @Test + public void testIsFor() { + setAll(); + + // null case + assertFalse(OperationOutcome.isFor(null, ACTOR, OPERATION)); + + // actor mismatch + assertFalse(OperationOutcome.isFor(outcome, TARGET, OPERATION)); + + // operation mismatch + assertFalse(OperationOutcome.isFor(outcome, ACTOR, TARGET)); + + // null actor in outcome + outcome.setActor(null); + assertFalse(OperationOutcome.isFor(outcome, ACTOR, OPERATION)); + outcome.setActor(ACTOR); + + // null operation in outcome + outcome.setOperation(null); + assertFalse(OperationOutcome.isFor(outcome, ACTOR, OPERATION)); + outcome.setOperation(OPERATION); + + // null actor argument + assertThatThrownBy(() -> outcome.isFor(null, OPERATION)); + + // null operation argument + assertThatThrownBy(() -> outcome.isFor(ACTOR, null)); + + // true case + assertTrue(OperationOutcome.isFor(outcome, ACTOR, OPERATION)); + } + + @Test + public void testSetResult() { + outcome.setResult(PolicyResult.FAILURE_EXCEPTION); + assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult()); + + assertThatThrownBy(() -> outcome.setResult(null)); + } + + private void setAll() { + outcome.setActor(ACTOR); + outcome.setEnd(END); + outcome.setMessage(MESSAGE); + outcome.setOperation(OPERATION); + outcome.setResult(RESULT); + outcome.setStart(START); + outcome.setSubRequestId(SUB_REQ_ID); + outcome.setTarget(TARGET); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/UtilTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/UtilTest.java index c652e8374..4a3f321cf 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/UtilTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/UtilTest.java @@ -27,15 +27,56 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import ch.qos.logback.classic.Logger; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; import lombok.Builder; import lombok.Data; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.common.utils.test.log.logback.ExtractAppender; +import org.slf4j.LoggerFactory; public class UtilTest { + private static final String MY_REQUEST = "my-request"; + private static final String URL = "my-url"; + private static final String OUT_URL = "OUT|REST|my-url"; + private static final String IN_URL = "IN|REST|my-url"; + protected static final String EXPECTED_EXCEPTION = "expected exception"; + + /** + * Used to attach an appender to the class' logger. + */ + private static final Logger logger = (Logger) LoggerFactory.getLogger(Util.class); + private static final ExtractAppender appender = new ExtractAppender(); + + /** + * Initializes statics. + */ + @BeforeClass + public static void setUpBeforeClass() { + appender.setContext(logger.getLoggerContext()); + appender.start(); + + logger.addAppender(appender); + } + + @AfterClass + public static void tearDownAfterClass() { + appender.stop(); + } + + @Before + public void setUp() { + appender.clearExtractions(); + } @Test public void testIdent() { @@ -48,11 +89,88 @@ public class UtilTest { } @Test - public void testLogException() { + public void testLogRestRequest() throws CoderException { + // log structured data + appender.clearExtractions(); + Util.logRestRequest(URL, new Abc(10, null, null)); + List<String> output = appender.getExtracted(); + assertEquals(1, output.size()); + + assertThat(output.get(0)).contains(OUT_URL).contains("{\n \"intValue\": 10\n}"); + + // log a plain string + appender.clearExtractions(); + Util.logRestRequest(URL, MY_REQUEST); + output = appender.getExtracted(); + assertEquals(1, output.size()); + + assertThat(output.get(0)).contains(OUT_URL).contains(MY_REQUEST); + + // exception from coder + StandardCoder coder = new StandardCoder() { + @Override + public String encode(Object object, boolean pretty) throws CoderException { + throw new CoderException(EXPECTED_EXCEPTION); + } + }; + + appender.clearExtractions(); + Util.logRestRequest(coder, URL, new Abc(11, null, null)); + output = appender.getExtracted(); + assertEquals(2, output.size()); + assertThat(output.get(0)).contains("cannot pretty-print request"); + assertThat(output.get(1)).contains(OUT_URL); + } + + @Test + public void testLogRestResponse() throws CoderException { + // log structured data + appender.clearExtractions(); + Util.logRestResponse(URL, new Abc(10, null, null)); + List<String> output = appender.getExtracted(); + assertEquals(1, output.size()); + + assertThat(output.get(0)).contains(IN_URL).contains("{\n \"intValue\": 10\n}"); + + // log null response + appender.clearExtractions(); + Util.logRestResponse(URL, null); + output = appender.getExtracted(); + assertEquals(1, output.size()); + + assertThat(output.get(0)).contains(IN_URL).contains("null"); + + // log a plain string + appender.clearExtractions(); + Util.logRestResponse(URL, MY_REQUEST); + output = appender.getExtracted(); + assertEquals(1, output.size()); + + assertThat(output.get(0)).contains(IN_URL).contains(MY_REQUEST); + + // exception from coder + StandardCoder coder = new StandardCoder() { + @Override + public String encode(Object object, boolean pretty) throws CoderException { + throw new CoderException(EXPECTED_EXCEPTION); + } + }; + + appender.clearExtractions(); + Util.logRestResponse(coder, URL, new Abc(11, null, null)); + output = appender.getExtracted(); + assertEquals(2, output.size()); + assertThat(output.get(0)).contains("cannot pretty-print response"); + assertThat(output.get(1)).contains(IN_URL); + } + + @Test + public void testRunFunction() { // no exception, no log AtomicInteger count = new AtomicInteger(); - Util.logException(() -> count.incrementAndGet(), "no error"); + Util.runFunction(() -> count.incrementAndGet(), "no error"); assertEquals(1, count.get()); + assertEquals(0, appender.getExtracted().size()); // with an exception Runnable runnable = () -> { @@ -60,8 +178,17 @@ public class UtilTest { throw new IllegalStateException("expected exception"); }; - Util.logException(runnable, "error with no args"); - Util.logException(runnable, "error {} {} arg(s)", "with", 1); + appender.clearExtractions(); + Util.runFunction(runnable, "error with no args"); + List<String> output = appender.getExtracted(); + assertEquals(1, output.size()); + assertThat(output.get(0)).contains("error with no args"); + + appender.clearExtractions(); + Util.runFunction(runnable, "error {} {} arg(s)", "with", 2); + output = appender.getExtracted(); + assertEquals(1, output.size()); + assertThat(output.get(0)).contains("error with 2 arg(s)"); } @Test @@ -123,4 +250,14 @@ public class UtilTest { private int intValue; private String strValue; } + + // throws an exception when getXxx() is used + public static class DataWithException { + @SuppressWarnings("unused") + private int intValue; + + public int getIntValue() { + throw new IllegalStateException(); + } + } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java index fcc3fb12e..0d917ad3e 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java @@ -20,28 +20,55 @@ package org.onap.policy.controlloop.actorserviceprovider.controlloop; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; +import java.util.Map; +import java.util.UUID; import org.junit.Before; import org.junit.Test; import org.onap.policy.controlloop.VirtualControlLoopEvent; public class ControlLoopEventContextTest { + private static final UUID REQ_ID = UUID.randomUUID(); + private Map<String, String> enrichment; private VirtualControlLoopEvent event; private ControlLoopEventContext context; + /** + * Initializes data, including {@link #context}. + */ @Before public void setUp() { + enrichment = Map.of("abc", "one", "def", "two"); + event = new VirtualControlLoopEvent(); + event.setRequestId(REQ_ID); + event.setAai(enrichment); + context = new ControlLoopEventContext(event); } @Test public void testControlLoopEventContext() { assertSame(event, context.getEvent()); + assertSame(REQ_ID, context.getRequestId()); + assertEquals(enrichment, context.getEnrichment()); + + // null event + assertThatThrownBy(() -> new ControlLoopEventContext(null)); + + // no request id, no enrichment data + event.setRequestId(null); + event.setAai(null); + context = new ControlLoopEventContext(event); + assertSame(event, context.getEvent()); + assertNotNull(context.getRequestId()); + assertEquals(Map.of(), context.getEnrichment()); } @Test diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImplTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImplTest.java index 7e0c35a3f..a209fb0d8 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImplTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImplTest.java @@ -34,7 +34,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.util.Collections; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; @@ -192,10 +191,10 @@ public class ActorImplTest { } @Test - public void testSetOperators() { - // cannot set operators if already configured + public void testAddOperator() { + // cannot add operators if already configured actor.configure(params); - assertThatIllegalStateException().isThrownBy(() -> actor.setOperators(Collections.emptyList())); + assertThatIllegalStateException().isThrownBy(() -> actor.addOperator(oper1)); /* * make an actor where operators two and four have names that are duplicates of @@ -367,7 +366,13 @@ public class ActorImplTest { * @return a new actor */ private ActorImpl makeActor(Operator... operators) { - return new ActorImpl(ACTOR_NAME, operators); + ActorImpl actor = new ActorImpl(ACTOR_NAME); + + for (Operator oper : operators) { + actor.addOperator(oper); + } + + return actor; } private static class MyOper extends OperatorPartial implements Operator { diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActorTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActorTest.java new file mode 100644 index 000000000..2da789989 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActorTest.java @@ -0,0 +1,81 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.Map; +import java.util.TreeMap; +import java.util.function.Function; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpActorParams; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; + +public class HttpActorTest { + + private static final String ACTOR = "my-actor"; + private static final String UNKNOWN = "unknown"; + private static final String CLIENT = "my-client"; + private static final long TIMEOUT = 10L; + + private HttpActor actor; + + @Before + public void setUp() { + actor = new HttpActor(ACTOR); + } + + @Test + public void testMakeOperatorParameters() { + HttpActorParams params = new HttpActorParams(); + params.setClientName(CLIENT); + params.setTimeoutSec(TIMEOUT); + params.setPath(Map.of("operA", "urlA", "operB", "urlB")); + + final HttpActor prov = new HttpActor(ACTOR); + Function<String, Map<String, Object>> maker = + prov.makeOperatorParameters(Util.translateToMap(prov.getName(), params)); + + assertNull(maker.apply(UNKNOWN)); + + // use a TreeMap to ensure the properties are sorted + assertEquals("{clientName=my-client, path=urlA, timeoutSec=10}", + new TreeMap<>(maker.apply("operA")).toString()); + + assertEquals("{clientName=my-client, path=urlB, timeoutSec=10}", + new TreeMap<>(maker.apply("operB")).toString()); + + // with invalid actor parameters + params.setClientName(null); + assertThatThrownBy(() -> prov.makeOperatorParameters(Util.translateToMap(prov.getName(), params))) + .isInstanceOf(ParameterValidationRuntimeException.class); + } + + @Test + public void testHttpActor() { + assertEquals(ACTOR, actor.getName()); + assertEquals(ACTOR, actor.getFullName()); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperatorTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperatorTest.java new file mode 100644 index 000000000..c006cf333 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperatorTest.java @@ -0,0 +1,104 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.policy.common.endpoints.http.client.HttpClient; +import org.onap.policy.common.endpoints.http.client.HttpClientFactory; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; + +public class HttpOperatorTest { + + private static final String ACTOR = "my-actor"; + private static final String OPERATION = "my-name"; + private static final String CLIENT = "my-client"; + private static final String PATH = "my-path"; + private static final long TIMEOUT = 100; + + @Mock + private HttpClient client; + + private HttpOperator oper; + + /** + * Initializes fields, including {@link #oper}. + */ + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + oper = new HttpOperator(ACTOR, OPERATION); + } + + @Test + public void testDoConfigureMapOfStringObject_testGetClient_testGetPath_testGetTimeoutSec() { + assertNull(oper.getClient()); + assertNull(oper.getPath()); + assertEquals(0L, oper.getTimeoutSec()); + + oper = new HttpOperator(ACTOR, OPERATION) { + @Override + protected HttpClientFactory getClientFactory() { + HttpClientFactory factory = mock(HttpClientFactory.class); + when(factory.get(CLIENT)).thenReturn(client); + return factory; + } + }; + + HttpParams params = HttpParams.builder().clientName(CLIENT).path(PATH).timeoutSec(TIMEOUT).build(); + Map<String, Object> paramMap = Util.translateToMap(OPERATION, params); + oper.configure(paramMap); + + assertSame(client, oper.getClient()); + assertEquals(PATH, oper.getPath()); + assertEquals(TIMEOUT, oper.getTimeoutSec()); + + // test invalid parameters + paramMap.remove("path"); + assertThatThrownBy(() -> oper.configure(paramMap)).isInstanceOf(ParameterValidationRuntimeException.class); + } + + @Test + public void testHttpOperator() { + assertEquals(ACTOR, oper.getActorName()); + assertEquals(OPERATION, oper.getName()); + assertEquals(ACTOR + "." + OPERATION, oper.getFullName()); + } + + @Test + public void testGetClient() { + assertNotNull(oper.getClientFactory()); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartialTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartialTest.java index 864ac829a..21bc656f2 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartialTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartialTest.java @@ -29,6 +29,7 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.time.Instant; @@ -36,17 +37,20 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Queue; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -58,9 +62,10 @@ import org.junit.Before; import org.junit.Test; import org.onap.policy.controlloop.ControlLoopOperation; import org.onap.policy.controlloop.VirtualControlLoopEvent; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; -import org.onap.policy.controlloop.policy.Policy; +import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; import org.onap.policy.controlloop.policy.PolicyResult; public class OperatorPartialTest { @@ -75,14 +80,10 @@ public class OperatorPartialTest { private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream() .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList()); - private static final List<String> FAILURE_STRINGS = - FAILURE_RESULTS.stream().map(Object::toString).collect(Collectors.toList()); - private VirtualControlLoopEvent event; private Map<String, Object> config; private ControlLoopEventContext context; private MyExec executor; - private Policy policy; private ControlLoopOperationParams params; private MyOper oper; @@ -92,8 +93,8 @@ public class OperatorPartialTest { private Instant tstart; - private ControlLoopOperation opstart; - private ControlLoopOperation opend; + private OperationOutcome opstart; + private OperationOutcome opend; /** * Initializes the fields, including {@link #oper}. @@ -107,13 +108,9 @@ public class OperatorPartialTest { context = new ControlLoopEventContext(event); executor = new MyExec(); - policy = new Policy(); - policy.setActor(ACTOR); - policy.setRecipe(OPERATOR); - policy.setTimeout(TIMEOUT); - params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context) - .executor(executor).policy(policy).startCallback(this::starter).target(TARGET).build(); + .executor(executor).actor(ACTOR).operation(OPERATOR).timeoutSec(TIMEOUT) + .startCallback(this::starter).targetEntity(TARGET).build(); oper = new MyOper(); oper.configure(new TreeMap<>()); @@ -133,6 +130,31 @@ public class OperatorPartialTest { } @Test + public void testGetBlockingExecutor() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + + /* + * Use an operator that doesn't override getBlockingExecutor(). + */ + OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATOR) {}; + oper2.getBlockingExecutor().execute(() -> latch.countDown()); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testDoConfigure() { + oper = spy(new MyOper()); + + oper.configure(config); + verify(oper).configure(config); + + // repeat - SHOULD be run again + oper.configure(config); + verify(oper, times(2)).configure(config); + } + + @Test public void testDoStart() { oper = spy(new MyOper()); @@ -181,7 +203,7 @@ public class OperatorPartialTest { } @Test - public void testStartOperation_testVerifyRunning() { + public void testStartOperation() { verifyRun("testStartOperation", 1, 1, PolicyResult.SUCCESS); } @@ -201,17 +223,13 @@ public class OperatorPartialTest { * Tests startOperation() when the operation has a preprocessor. */ @Test - public void testStartOperationWithPreprocessor_testStartPreprocessor() { + public void testStartOperationWithPreprocessor() { AtomicInteger count = new AtomicInteger(); - // @formatter:off - Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preproc = - oper -> CompletableFuture.supplyAsync(() -> { - count.incrementAndGet(); - oper.setOutcome(PolicyResult.SUCCESS.toString()); - return oper; - }, executor); - // @formatter:on + CompletableFuture<OperationOutcome> preproc = CompletableFuture.supplyAsync(() -> { + count.incrementAndGet(); + return makeSuccess(); + }, executor); oper.setPreProcessor(preproc); @@ -233,7 +251,7 @@ public class OperatorPartialTest { assertNotNull(opstart); assertNotNull(opend); - assertEquals(PolicyResult.SUCCESS.toString(), opend.getOutcome()); + assertEquals(PolicyResult.SUCCESS, opend.getResult()); assertEquals(MAX_PARALLEL_REQUESTS, numStart); assertEquals(MAX_PARALLEL_REQUESTS, oper.getCount()); @@ -245,11 +263,7 @@ public class OperatorPartialTest { */ @Test public void testStartPreprocessorFailure() { - // arrange for the preprocessor to return a failure - oper.setPreProcessor(oper -> { - oper.setOutcome(PolicyResult.FAILURE_GUARD.toString()); - return CompletableFuture.completedFuture(oper); - }); + oper.setPreProcessor(CompletableFuture.completedFuture(makeFailure())); verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD); } @@ -260,9 +274,7 @@ public class OperatorPartialTest { @Test public void testStartPreprocessorException() { // arrange for the preprocessor to throw an exception - oper.setPreProcessor(oper -> { - throw new IllegalStateException(EXPECTED_EXCEPTION); - }); + oper.setPreProcessor(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION))); verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD); } @@ -273,10 +285,7 @@ public class OperatorPartialTest { @Test public void testStartPreprocessorNotRunning() { // arrange for the preprocessor to return success, which will be ignored - oper.setPreProcessor(oper -> { - oper.setOutcome(PolicyResult.SUCCESS.toString()); - return CompletableFuture.completedFuture(oper); - }); + oper.setPreProcessor(CompletableFuture.completedFuture(makeSuccess())); oper.startOperation(params).cancel(false); assertTrue(executor.runAll()); @@ -296,8 +305,7 @@ public class OperatorPartialTest { public void testStartPreprocessorBuilderException() { oper = new MyOper() { @Override - protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture( - ControlLoopOperationParams params) { + protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) { throw new IllegalStateException(EXPECTED_EXCEPTION); } }; @@ -312,51 +320,27 @@ public class OperatorPartialTest { } @Test - public void testDoPreprocessorAsFuture() { - assertNull(oper.doPreprocessorAsFuture(params)); + public void testStartPreprocessorAsync() { + assertNull(oper.startPreprocessorAsync(params)); } @Test - public void testStartOperationOnly_testDoOperationAsFuture() { + public void testStartOperationAsync() { oper.startOperation(params); assertTrue(executor.runAll()); assertEquals(1, oper.getCount()); } - /** - * Tests startOperationOnce() when - * {@link OperatorPartial#doOperationAsFuture(ControlLoopOperationParams)} throws an - * exception. - */ - @Test - public void testStartOperationOnceBuilderException() { - oper = new MyOper() { - @Override - protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture( - ControlLoopOperationParams params, int attempt) { - throw new IllegalStateException(EXPECTED_EXCEPTION); - } - }; - - oper.configure(new TreeMap<>()); - oper.start(); - - assertThatIllegalStateException().isThrownBy(() -> oper.startOperation(params)); - - // should be nothing in the queue - assertEquals(0, executor.getQueueLength()); - } - @Test public void testIsSuccess() { - ControlLoopOperation outcome = new ControlLoopOperation(); + OperationOutcome outcome = new OperationOutcome(); - outcome.setOutcome(PolicyResult.SUCCESS.toString()); + outcome.setResult(PolicyResult.SUCCESS); assertTrue(oper.isSuccess(outcome)); - for (String failure : FAILURE_STRINGS) { - outcome.setOutcome(failure); + for (PolicyResult failure : FAILURE_RESULTS) { + outcome.setResult(failure); assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome)); } } @@ -365,17 +349,17 @@ public class OperatorPartialTest { public void testIsActorFailed() { assertFalse(oper.isActorFailed(null)); - ControlLoopOperation outcome = params.makeOutcome(); + OperationOutcome outcome = params.makeOutcome(); // incorrect outcome - outcome.setOutcome(PolicyResult.SUCCESS.toString()); + outcome.setResult(PolicyResult.SUCCESS); assertFalse(oper.isActorFailed(outcome)); - outcome.setOutcome(PolicyResult.FAILURE_RETRIES.toString()); + outcome.setResult(PolicyResult.FAILURE_RETRIES); assertFalse(oper.isActorFailed(outcome)); // correct outcome - outcome.setOutcome(PolicyResult.FAILURE.toString()); + outcome.setResult(PolicyResult.FAILURE); // incorrect actor outcome.setActor(TARGET); @@ -400,7 +384,12 @@ public class OperatorPartialTest { /* * Use an operator that doesn't override doOperation(). */ - OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATOR) {}; + OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATOR) { + @Override + protected Executor getBlockingExecutor() { + return executor; + } + }; oper2.configure(new TreeMap<>()); oper2.start(); @@ -409,7 +398,7 @@ public class OperatorPartialTest { assertTrue(executor.runAll()); assertNotNull(opend); - assertEquals(PolicyResult.FAILURE_EXCEPTION.toString(), opend.getOutcome()); + assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult()); } @Test @@ -421,36 +410,34 @@ public class OperatorPartialTest { // trigger timeout very quickly oper = new MyOper() { @Override - protected long getTimeOutMillis(Policy policy) { + protected long getTimeOutMillis(Integer timeoutSec) { return 1; } @Override - protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture( - ControlLoopOperationParams params, int attempt) { - - return outcome -> { - ControlLoopOperation outcome2 = params.makeOutcome(); - outcome2.setOutcome(PolicyResult.SUCCESS.toString()); - - /* - * Create an incomplete future that will timeout after the operation's - * timeout. If it fires before the other timer, then it will return a - * SUCCESS outcome. - */ - CompletableFuture<ControlLoopOperation> future = new CompletableFuture<>(); - future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome, - params.getExecutor()); - - return future; - }; + protected CompletableFuture<OperationOutcome> startOperationAsync(ControlLoopOperationParams params, + int attempt, OperationOutcome outcome) { + + OperationOutcome outcome2 = params.makeOutcome(); + outcome2.setResult(PolicyResult.SUCCESS); + + /* + * Create an incomplete future that will timeout after the operation's + * timeout. If it fires before the other timer, then it will return a + * SUCCESS outcome. + */ + CompletableFuture<OperationOutcome> future = new CompletableFuture<>(); + future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome, + params.getExecutor()); + + return future; } }; oper.configure(new TreeMap<>()); oper.start(); - assertEquals(PolicyResult.FAILURE_TIMEOUT.toString(), oper.startOperation(params).get().getOutcome()); + assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.startOperation(params).get().getResult()); } /** @@ -466,40 +453,45 @@ public class OperatorPartialTest { // trigger timeout very quickly oper = new MyOper() { @Override - protected long getTimeOutMillis(Policy policy) { + protected long getTimeOutMillis(Integer timeoutSec) { return 10; } @Override - protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture( - ControlLoopOperationParams params) { - - return outcome -> { - outcome.setOutcome(PolicyResult.SUCCESS.toString()); - - /* - * Create an incomplete future that will timeout after the operation's - * timeout. If it fires before the other timer, then it will return a - * SUCCESS outcome. - */ - CompletableFuture<ControlLoopOperation> future = new CompletableFuture<>(); - future = future.orTimeout(200, TimeUnit.MILLISECONDS).handleAsync((unused1, unused2) -> outcome, - params.getExecutor()); - - return future; + protected Executor getBlockingExecutor() { + return command -> { + Thread thread = new Thread(command); + thread.start(); }; } + + @Override + protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) { + + OperationOutcome outcome = makeSuccess(); + + /* + * Create an incomplete future that will timeout after the operation's + * timeout. If it fires before the other timer, then it will return a + * SUCCESS outcome. + */ + CompletableFuture<OperationOutcome> future = new CompletableFuture<>(); + future = future.orTimeout(200, TimeUnit.MILLISECONDS).handleAsync((unused1, unused2) -> outcome, + params.getExecutor()); + + return future; + } }; oper.configure(new TreeMap<>()); oper.start(); - ControlLoopOperation result = oper.startOperation(params).get(); - assertEquals(PolicyResult.SUCCESS.toString(), result.getOutcome()); + OperationOutcome result = oper.startOperation(params).get(); + assertEquals(PolicyResult.SUCCESS, result.getResult()); assertNotNull(opstart); assertNotNull(opend); - assertEquals(PolicyResult.SUCCESS.toString(), opend.getOutcome()); + assertEquals(PolicyResult.SUCCESS, opend.getResult()); assertEquals(1, numStart); assertEquals(1, oper.getCount()); @@ -510,8 +502,8 @@ public class OperatorPartialTest { * Tests retry functions, when the count is set to zero and retries are exhausted. */ @Test - public void testSetRetryFlag_testRetryOnFailure_ZeroRetries() { - policy.setRetry(0); + public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() { + params = params.toBuilder().retry(0).build(); oper.setMaxFailures(10); verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE); @@ -522,7 +514,7 @@ public class OperatorPartialTest { */ @Test public void testSetRetryFlag_testRetryOnFailure_NullRetries() { - policy.setRetry(null); + params = params.toBuilder().retry(null).build(); oper.setMaxFailures(10); verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE); @@ -534,10 +526,11 @@ public class OperatorPartialTest { @Test public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() { final int maxRetries = 3; - policy.setRetry(maxRetries); + params = params.toBuilder().retry(maxRetries).build(); oper.setMaxFailures(10); - verifyRun("testVerifyRunningWhenNot", maxRetries + 1, maxRetries + 1, PolicyResult.FAILURE_RETRIES); + verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1, + PolicyResult.FAILURE_RETRIES); } /** @@ -545,7 +538,7 @@ public class OperatorPartialTest { */ @Test public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() { - policy.setRetry(10); + params = params.toBuilder().retry(10).build(); final int maxFailures = 3; oper.setMaxFailures(maxFailures); @@ -563,8 +556,8 @@ public class OperatorPartialTest { // arrange to return null from doOperation() oper = new MyOper() { @Override - protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt, - ControlLoopOperation operation) { + protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt, + OperationOutcome operation) { // update counters super.doOperation(params, attempt, operation); @@ -579,96 +572,55 @@ public class OperatorPartialTest { } @Test - public void testGetActorOutcome() { - assertNull(oper.getActorOutcome(null)); + public void testIsSameOperation() { + assertFalse(oper.isSameOperation(null)); - ControlLoopOperation outcome = params.makeOutcome(); - outcome.setOutcome(TARGET); + OperationOutcome outcome = params.makeOutcome(); - // wrong actor - should be null + // wrong actor - should be false outcome.setActor(null); - assertNull(oper.getActorOutcome(outcome)); + assertFalse(oper.isSameOperation(outcome)); outcome.setActor(TARGET); - assertNull(oper.getActorOutcome(outcome)); + assertFalse(oper.isSameOperation(outcome)); outcome.setActor(ACTOR); // wrong operation - should be null outcome.setOperation(null); - assertNull(oper.getActorOutcome(outcome)); + assertFalse(oper.isSameOperation(outcome)); outcome.setOperation(TARGET); - assertNull(oper.getActorOutcome(outcome)); + assertFalse(oper.isSameOperation(outcome)); outcome.setOperation(OPERATOR); - assertEquals(TARGET, oper.getActorOutcome(outcome)); - } - - @Test - public void testOnSuccess() throws Exception { - AtomicInteger count = new AtomicInteger(); - - final Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> nextStep = oper -> { - count.incrementAndGet(); - return CompletableFuture.completedFuture(oper); - }; - - // pass it a null outcome - ControlLoopOperation outcome = oper.onSuccess(params, nextStep).apply(null).get(); - assertNotNull(outcome); - assertEquals(PolicyResult.FAILURE.toString(), outcome.getOutcome()); - assertEquals(0, count.get()); - - // pass it an unpopulated (i.e., failed) outcome - outcome = new ControlLoopOperation(); - assertSame(outcome, oper.onSuccess(params, nextStep).apply(outcome).get()); - assertEquals(0, count.get()); - - // pass it a successful outcome - outcome = params.makeOutcome(); - outcome.setOutcome(PolicyResult.SUCCESS.toString()); - assertSame(outcome, oper.onSuccess(params, nextStep).apply(outcome).get()); - assertEquals(PolicyResult.SUCCESS.toString(), outcome.getOutcome()); - assertEquals(1, count.get()); + assertTrue(oper.isSameOperation(outcome)); } /** - * Tests onSuccess() and handleFailure() when the outcome is a success. + * Tests handleFailure() when the outcome is a success. */ @Test - public void testOnSuccessTrue_testHandleFailureTrue() { - // arrange to return a success from the preprocessor - oper.setPreProcessor(oper -> { - oper.setOutcome(PolicyResult.SUCCESS.toString()); - return CompletableFuture.completedFuture(oper); - }); - - verifyRun("testOnSuccessTrue_testHandleFailureTrue", 1, 1, PolicyResult.SUCCESS); + public void testHandlePreprocessorFailureTrue() { + oper.setPreProcessor(CompletableFuture.completedFuture(makeSuccess())); + verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS); } /** - * Tests onSuccess() and handleFailure() when the outcome is <i>not</i> a success. + * Tests handleFailure() when the outcome is <i>not</i> a success. */ @Test - public void testOnSuccessFalse_testHandleFailureFalse() throws Exception { - // arrange to return a failure from the preprocessor - oper.setPreProcessor(oper -> { - oper.setOutcome(PolicyResult.FAILURE.toString()); - return CompletableFuture.completedFuture(oper); - }); - - verifyRun("testOnSuccessFalse_testHandleFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD); + public void testHandlePreprocessorFailureFalse() throws Exception { + oper.setPreProcessor(CompletableFuture.completedFuture(makeFailure())); + verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD); } /** - * Tests onSuccess() and handleFailure() when the outcome is {@code null}. + * Tests handleFailure() when the outcome is {@code null}. */ @Test - public void testOnSuccessFalse_testHandleFailureNull() throws Exception { + public void testHandlePreprocessorFailureNull() throws Exception { // arrange to return null from the preprocessor - oper.setPreProcessor(oper -> { - return CompletableFuture.completedFuture(null); - }); + oper.setPreProcessor(CompletableFuture.completedFuture(null)); - verifyRun("testOnSuccessFalse_testHandleFailureNull", 1, 0, PolicyResult.FAILURE_GUARD); + verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD); } @Test @@ -688,11 +640,374 @@ public class OperatorPartialTest { } /** - * Tests verifyRunning() when the pipeline is not running. + * Tests both flavors of anyOf(), because one invokes the other. + */ + @Test + public void testAnyOf() throws Exception { + // first task completes, others do not + List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>(); + + final OperationOutcome outcome = params.makeOutcome(); + + tasks.add(CompletableFuture.completedFuture(outcome)); + tasks.add(new CompletableFuture<>()); + tasks.add(new CompletableFuture<>()); + + CompletableFuture<OperationOutcome> result = oper.anyOf(params, tasks); + assertTrue(executor.runAll()); + + assertTrue(result.isDone()); + assertSame(outcome, result.get()); + + // second task completes, others do not + tasks = new LinkedList<>(); + + tasks.add(new CompletableFuture<>()); + tasks.add(CompletableFuture.completedFuture(outcome)); + tasks.add(new CompletableFuture<>()); + + result = oper.anyOf(params, tasks); + assertTrue(executor.runAll()); + + assertTrue(result.isDone()); + assertSame(outcome, result.get()); + + // third task completes, others do not + tasks = new LinkedList<>(); + + tasks.add(new CompletableFuture<>()); + tasks.add(new CompletableFuture<>()); + tasks.add(CompletableFuture.completedFuture(outcome)); + + result = oper.anyOf(params, tasks); + assertTrue(executor.runAll()); + + assertTrue(result.isDone()); + assertSame(outcome, result.get()); + } + + /** + * Tests both flavors of allOf(), because one invokes the other. + */ + @Test + public void testAllOf() throws Exception { + List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>(); + + final OperationOutcome outcome = params.makeOutcome(); + + CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>(); + CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>(); + CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>(); + + tasks.add(future1); + tasks.add(future2); + tasks.add(future3); + + CompletableFuture<OperationOutcome> result = oper.allOf(params, tasks); + + assertTrue(executor.runAll()); + assertFalse(result.isDone()); + future1.complete(outcome); + + // complete 3 before 2 + assertTrue(executor.runAll()); + assertFalse(result.isDone()); + future3.complete(outcome); + + assertTrue(executor.runAll()); + assertFalse(result.isDone()); + future2.complete(outcome); + + // all of them are now done + assertTrue(executor.runAll()); + assertTrue(result.isDone()); + assertSame(outcome, result.get()); + } + + @Test + public void testCombineOutcomes() throws Exception { + // only one outcome + verifyOutcomes(0, PolicyResult.SUCCESS); + verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION); + + // maximum is in different positions + verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD); + verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD); + verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE); + + // null outcome + final List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>(); + tasks.add(CompletableFuture.completedFuture(null)); + CompletableFuture<OperationOutcome> result = oper.allOf(params, tasks); + + assertTrue(executor.runAll()); + assertTrue(result.isDone()); + assertNull(result.get()); + + // one throws an exception during execution + IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION); + + tasks.clear(); + tasks.add(CompletableFuture.completedFuture(params.makeOutcome())); + tasks.add(CompletableFuture.failedFuture(except)); + tasks.add(CompletableFuture.completedFuture(params.makeOutcome())); + result = oper.allOf(params, tasks); + + assertTrue(executor.runAll()); + assertTrue(result.isCompletedExceptionally()); + result.whenComplete((unused, thrown) -> assertSame(except, thrown)); + } + + private void verifyOutcomes(int expected, PolicyResult... results) throws Exception { + List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>(); + + + OperationOutcome expectedOutcome = null; + + for (int count = 0; count < results.length; ++count) { + OperationOutcome outcome = params.makeOutcome(); + outcome.setResult(results[count]); + tasks.add(CompletableFuture.completedFuture(outcome)); + + if (count == expected) { + expectedOutcome = outcome; + } + } + + CompletableFuture<OperationOutcome> result = oper.allOf(params, tasks); + + assertTrue(executor.runAll()); + assertTrue(result.isDone()); + assertSame(expectedOutcome, result.get()); + } + + private Function<OperationOutcome, CompletableFuture<OperationOutcome>> makeTask( + final OperationOutcome taskOutcome) { + + return outcome -> CompletableFuture.completedFuture(taskOutcome); + } + + @Test + public void testDetmPriority() { + assertEquals(1, oper.detmPriority(null)); + + OperationOutcome outcome = params.makeOutcome(); + + Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2, + PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5, + PolicyResult.FAILURE_EXCEPTION, 6); + + for (Entry<PolicyResult, Integer> ent : map.entrySet()) { + outcome.setResult(ent.getKey()); + assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome)); + } + } + + /** + * Tests doTask(Future) when the controller is not running. + */ + @Test + public void testDoTaskFutureNotRunning() throws Exception { + CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>(); + + PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + controller.complete(params.makeOutcome()); + + CompletableFuture<OperationOutcome> future = + oper.doTask(params, controller, false, params.makeOutcome(), taskFuture); + assertFalse(future.isDone()); + assertTrue(executor.runAll()); + + // should not have run the task + assertFalse(future.isDone()); + + // should have canceled the task future + assertTrue(taskFuture.isCancelled()); + } + + /** + * Tests doTask(Future) when the previous outcome was successful. + */ + @Test + public void testDoTaskFutureSuccess() throws Exception { + CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>(); + final OperationOutcome taskOutcome = params.makeOutcome(); + + PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + CompletableFuture<OperationOutcome> future = + oper.doTask(params, controller, true, params.makeOutcome(), taskFuture); + + taskFuture.complete(taskOutcome); + assertTrue(executor.runAll()); + + assertTrue(future.isDone()); + assertSame(taskOutcome, future.get()); + + // controller should not be done yet + assertFalse(controller.isDone()); + } + + /** + * Tests doTask(Future) when the previous outcome was failed. + */ + @Test + public void testDoTaskFutureFailure() throws Exception { + CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>(); + final OperationOutcome failedOutcome = params.makeOutcome(); + failedOutcome.setResult(PolicyResult.FAILURE); + + PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, true, failedOutcome, taskFuture); + assertFalse(future.isDone()); + assertTrue(executor.runAll()); + + // should not have run the task + assertFalse(future.isDone()); + + // should have canceled the task future + assertTrue(taskFuture.isCancelled()); + + // controller SHOULD be done now + assertTrue(controller.isDone()); + assertSame(failedOutcome, controller.get()); + } + + /** + * Tests doTask(Future) when the previous outcome was failed, but not checking + * success. + */ + @Test + public void testDoTaskFutureUncheckedFailure() throws Exception { + CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>(); + final OperationOutcome failedOutcome = params.makeOutcome(); + failedOutcome.setResult(PolicyResult.FAILURE); + + PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, false, failedOutcome, taskFuture); + assertFalse(future.isDone()); + + // complete the task + OperationOutcome taskOutcome = params.makeOutcome(); + taskFuture.complete(taskOutcome); + + assertTrue(executor.runAll()); + + // should have run the task + assertTrue(future.isDone()); + + assertTrue(future.isDone()); + assertSame(taskOutcome, future.get()); + + // controller should not be done yet + assertFalse(controller.isDone()); + } + + /** + * Tests doTask(Function) when the controller is not running. + */ + @Test + public void testDoTaskFunctionNotRunning() throws Exception { + AtomicBoolean invoked = new AtomicBoolean(); + + Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> { + invoked.set(true); + return CompletableFuture.completedFuture(params.makeOutcome()); + }; + + PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + controller.complete(params.makeOutcome()); + + CompletableFuture<OperationOutcome> future = + oper.doTask(params, controller, false, task).apply(params.makeOutcome()); + assertFalse(future.isDone()); + assertTrue(executor.runAll()); + + // should not have run the task + assertFalse(future.isDone()); + + // should not have even invoked the task + assertFalse(invoked.get()); + } + + /** + * Tests doTask(Function) when the previous outcome was successful. + */ + @Test + public void testDoTaskFunctionSuccess() throws Exception { + final OperationOutcome taskOutcome = params.makeOutcome(); + + final OperationOutcome failedOutcome = params.makeOutcome(); + + Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome); + + PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, true, task).apply(failedOutcome); + + assertTrue(future.isDone()); + assertSame(taskOutcome, future.get()); + + // controller should not be done yet + assertFalse(controller.isDone()); + } + + /** + * Tests doTask(Function) when the previous outcome was failed. + */ + @Test + public void testDoTaskFunctionFailure() throws Exception { + final OperationOutcome failedOutcome = params.makeOutcome(); + failedOutcome.setResult(PolicyResult.FAILURE); + + AtomicBoolean invoked = new AtomicBoolean(); + + Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> { + invoked.set(true); + return CompletableFuture.completedFuture(params.makeOutcome()); + }; + + PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, true, task).apply(failedOutcome); + assertFalse(future.isDone()); + assertTrue(executor.runAll()); + + // should not have run the task + assertFalse(future.isDone()); + + // should not have even invoked the task + assertFalse(invoked.get()); + + // controller should have the failed task + assertTrue(controller.isDone()); + assertSame(failedOutcome, controller.get()); + } + + /** + * Tests doTask(Function) when the previous outcome was failed, but not checking + * success. */ @Test - public void testVerifyRunningWhenNot() { - verifyRun("testVerifyRunningWhenNot", 0, 0, PolicyResult.SUCCESS, future -> future.cancel(false)); + public void testDoTaskFunctionUncheckedFailure() throws Exception { + final OperationOutcome taskOutcome = params.makeOutcome(); + + final OperationOutcome failedOutcome = params.makeOutcome(); + failedOutcome.setResult(PolicyResult.FAILURE); + + Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome); + + PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, false, task).apply(failedOutcome); + + assertTrue(future.isDone()); + assertSame(taskOutcome, future.get()); + + // controller should not be done yet + assertFalse(controller.isDone()); } /** @@ -700,7 +1015,7 @@ public class OperatorPartialTest { */ @Test public void testCallbackStartedNotRunning() { - AtomicReference<Future<ControlLoopOperation>> future = new AtomicReference<>(); + AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>(); /* * arrange to stop the controller when the start-callback is invoked, but capture @@ -723,7 +1038,7 @@ public class OperatorPartialTest { */ @Test public void testCallbackCompletedNotRunning() { - AtomicReference<Future<ControlLoopOperation>> future = new AtomicReference<>(); + AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>(); // arrange to stop the controller when the start-callback is invoked params = params.toBuilder().startCallback(oper -> { @@ -739,36 +1054,36 @@ public class OperatorPartialTest { } @Test - public void testSetOutcomeControlLoopOperationThrowable() { + public void testSetOutcomeControlLoopOperationOutcomeThrowable() { final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION)); - ControlLoopOperation outcome; + OperationOutcome outcome; - outcome = new ControlLoopOperation(); + outcome = new OperationOutcome(); oper.setOutcome(params, outcome, timex); assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage()); - assertEquals(PolicyResult.FAILURE_TIMEOUT.toString(), outcome.getOutcome()); + assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult()); - outcome = new ControlLoopOperation(); - oper.setOutcome(params, outcome, new IllegalStateException()); + outcome = new OperationOutcome(); + oper.setOutcome(params, outcome, new IllegalStateException(EXPECTED_EXCEPTION)); assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage()); - assertEquals(PolicyResult.FAILURE_EXCEPTION.toString(), outcome.getOutcome()); + assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult()); } @Test - public void testSetOutcomeControlLoopOperationPolicyResult() { - ControlLoopOperation outcome; + public void testSetOutcomeControlLoopOperationOutcomePolicyResult() { + OperationOutcome outcome; - outcome = new ControlLoopOperation(); + outcome = new OperationOutcome(); oper.setOutcome(params, outcome, PolicyResult.SUCCESS); assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage()); - assertEquals(PolicyResult.SUCCESS.toString(), outcome.getOutcome()); + assertEquals(PolicyResult.SUCCESS, outcome.getResult()); for (PolicyResult result : FAILURE_RESULTS) { - outcome = new ControlLoopOperation(); + outcome = new OperationOutcome(); oper.setOutcome(params, outcome, result); assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage()); - assertEquals(result.toString(), result.toString(), outcome.getOutcome()); + assertEquals(result.toString(), result, outcome.getResult()); } } @@ -776,7 +1091,7 @@ public class OperatorPartialTest { public void testIsTimeout() { final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION); - assertFalse(oper.isTimeout(new IllegalStateException())); + assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION))); assertFalse(oper.isTimeout(new IllegalStateException(timex))); assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex)))); assertFalse(oper.isTimeout(new CompletionException(null))); @@ -788,19 +1103,19 @@ public class OperatorPartialTest { @Test public void testGetTimeOutMillis() { - assertEquals(TIMEOUT * 1000, oper.getTimeOutMillis(policy)); + assertEquals(TIMEOUT * 1000, oper.getTimeOutMillis(params.getTimeoutSec())); - policy.setTimeout(null); - assertEquals(0, oper.getTimeOutMillis(policy)); + params = params.toBuilder().timeoutSec(null).build(); + assertEquals(0, oper.getTimeOutMillis(params.getTimeoutSec())); } - private void starter(ControlLoopOperation oper) { + private void starter(OperationOutcome oper) { ++numStart; tstart = oper.getStart(); opstart = oper; } - private void completer(ControlLoopOperation oper) { + private void completer(OperationOutcome oper) { ++numEnd; opend = oper; } @@ -816,21 +1131,18 @@ public class OperatorPartialTest { }; } - /** - * Verifies a run. - * - * @param testName test name - * @param expectedCallbacks number of callbacks expected - * @param expectedOperations number of operation invocations expected - * @param expectedResult expected outcome - */ - private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, - PolicyResult expectedResult) { + private OperationOutcome makeSuccess() { + OperationOutcome outcome = params.makeOutcome(); + outcome.setResult(PolicyResult.SUCCESS); - String expectedSubRequestId = - (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations)); + return outcome; + } - verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop()); + private OperationOutcome makeFailure() { + OperationOutcome outcome = params.makeOutcome(); + outcome.setResult(PolicyResult.FAILURE); + + return outcome; } /** @@ -840,17 +1152,14 @@ public class OperatorPartialTest { * @param expectedCallbacks number of callbacks expected * @param expectedOperations number of operation invocations expected * @param expectedResult expected outcome - * @param manipulator function to modify the future returned by - * {@link OperatorPartial#startOperation(ControlLoopOperationParams)} before - * the tasks in the executor are run */ - private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult, - Consumer<CompletableFuture<ControlLoopOperation>> manipulator) { + private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, + PolicyResult expectedResult) { String expectedSubRequestId = (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations)); - verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, manipulator); + verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop()); } /** @@ -866,9 +1175,9 @@ public class OperatorPartialTest { * the tasks in the executor are run */ private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult, - String expectedSubRequestId, Consumer<CompletableFuture<ControlLoopOperation>> manipulator) { + String expectedSubRequestId, Consumer<CompletableFuture<OperationOutcome>> manipulator) { - CompletableFuture<ControlLoopOperation> future = oper.startOperation(params); + CompletableFuture<OperationOutcome> future = oper.startOperation(params); manipulator.accept(future); @@ -880,7 +1189,7 @@ public class OperatorPartialTest { if (expectedCallbacks > 0) { assertNotNull(testName, opstart); assertNotNull(testName, opend); - assertEquals(testName, expectedResult.toString(), opend.getOutcome()); + assertEquals(testName, expectedResult, opend.getResult()); assertSame(testName, tstart, opstart.getStart()); assertSame(testName, tstart, opend.getStart()); @@ -901,7 +1210,7 @@ public class OperatorPartialTest { assertEquals(testName, expectedOperations, oper.getCount()); } - private static class MyOper extends OperatorPartial { + private class MyOper extends OperatorPartial { @Getter private int count = 0; @@ -912,15 +1221,15 @@ public class OperatorPartialTest { private int maxFailures = 0; @Setter - private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preProcessor; + private CompletableFuture<OperationOutcome> preProcessor; public MyOper() { super(ACTOR, OPERATOR); } @Override - protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt, - ControlLoopOperation operation) { + protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt, + OperationOutcome operation) { ++count; if (genException) { throw new IllegalStateException(EXPECTED_EXCEPTION); @@ -929,19 +1238,22 @@ public class OperatorPartialTest { operation.setSubRequestId(String.valueOf(attempt)); if (count > maxFailures) { - operation.setOutcome(PolicyResult.SUCCESS.toString()); + operation.setResult(PolicyResult.SUCCESS); } else { - operation.setOutcome(PolicyResult.FAILURE.toString()); + operation.setResult(PolicyResult.FAILURE); } return operation; } @Override - protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture( - ControlLoopOperationParams params) { + protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) { + return (preProcessor != null ? preProcessor : super.startPreprocessorAsync(params)); + } - return (preProcessor != null ? preProcessor : super.doPreprocessorAsFuture(params)); + @Override + protected Executor getBlockingExecutor() { + return executor; } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParamsTest.java index 0c8e77d38..9dd19d548 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParamsTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParamsTest.java @@ -35,6 +35,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.Map; +import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -47,20 +49,23 @@ import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.onap.policy.common.parameters.BeanValidationResult; -import org.onap.policy.controlloop.ControlLoopOperation; import org.onap.policy.controlloop.VirtualControlLoopEvent; import org.onap.policy.controlloop.actorserviceprovider.ActorService; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; import org.onap.policy.controlloop.actorserviceprovider.Operator; import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams.ControlLoopOperationParamsBuilder; import org.onap.policy.controlloop.actorserviceprovider.spi.Actor; -import org.onap.policy.controlloop.policy.Policy; +import org.onap.policy.controlloop.policy.Target; public class ControlLoopOperationParamsTest { private static final String EXPECTED_EXCEPTION = "expected exception"; private static final String ACTOR = "my-actor"; private static final String OPERATION = "my-operation"; - private static final String TARGET = "my-target"; + private static final Target TARGET = new Target(); + private static final String TARGET_ENTITY = "my-target"; + private static final Integer RETRY = 3; + private static final Integer TIMEOUT = 100; private static final UUID REQ_ID = UUID.randomUUID(); @Mock @@ -70,7 +75,7 @@ public class ControlLoopOperationParamsTest { private ActorService actorService; @Mock - private Consumer<ControlLoopOperation> completer; + private Consumer<OperationOutcome> completer; @Mock private ControlLoopEventContext context; @@ -82,19 +87,18 @@ public class ControlLoopOperationParamsTest { private Executor executor; @Mock - private CompletableFuture<ControlLoopOperation> operation; + private CompletableFuture<OperationOutcome> operation; @Mock private Operator operator; @Mock - private Policy policy; + private Consumer<OperationOutcome> starter; - @Mock - private Consumer<ControlLoopOperation> starter; + private Map<String, String> payload; private ControlLoopOperationParams params; - private ControlLoopOperation outcome; + private OperationOutcome outcome; /** @@ -112,12 +116,12 @@ public class ControlLoopOperationParamsTest { when(context.getEvent()).thenReturn(event); - when(policy.getActor()).thenReturn(ACTOR); - when(policy.getRecipe()).thenReturn(OPERATION); + payload = new TreeMap<>(); params = ControlLoopOperationParams.builder().actorService(actorService).completeCallback(completer) - .context(context).executor(executor).policy(policy).startCallback(starter).target(TARGET) - .build(); + .context(context).executor(executor).actor(ACTOR).operation(OPERATION).payload(payload) + .retry(RETRY).target(TARGET).targetEntity(TARGET_ENTITY).timeoutSec(TIMEOUT) + .startCallback(starter).build(); outcome = params.makeOutcome(); } @@ -130,30 +134,6 @@ public class ControlLoopOperationParamsTest { } @Test - public void testGetActor() { - assertEquals(ACTOR, params.getActor()); - - // try with null policy - assertEquals(ControlLoopOperationParams.UNKNOWN, params.toBuilder().policy(null).build().getActor()); - - // try with null name in the policy - when(policy.getActor()).thenReturn(null); - assertEquals(ControlLoopOperationParams.UNKNOWN, params.getActor()); - } - - @Test - public void testGetOperation() { - assertEquals(OPERATION, params.getOperation()); - - // try with null policy - assertEquals(ControlLoopOperationParams.UNKNOWN, params.toBuilder().policy(null).build().getOperation()); - - // try with null name in the policy - when(policy.getRecipe()).thenReturn(null); - assertEquals(ControlLoopOperationParams.UNKNOWN, params.getOperation()); - } - - @Test public void testGetRequestId() { assertSame(REQ_ID, params.getRequestId()); @@ -170,20 +150,14 @@ public class ControlLoopOperationParamsTest { assertEquals(ACTOR, outcome.getActor()); assertEquals(OPERATION, outcome.getOperation()); checkRemainingFields("with actor"); - - // try again with a null policy - outcome = params.toBuilder().policy(null).build().makeOutcome(); - assertEquals(ControlLoopOperationParams.UNKNOWN, outcome.getActor()); - assertEquals(ControlLoopOperationParams.UNKNOWN, outcome.getOperation()); - checkRemainingFields("unknown actor"); } protected void checkRemainingFields(String testName) { - assertEquals(testName, TARGET, outcome.getTarget()); - assertNotNull(testName, outcome.getStart()); + assertEquals(testName, TARGET_ENTITY, outcome.getTarget()); + assertNull(testName, outcome.getStart()); assertNull(testName, outcome.getEnd()); assertNull(testName, outcome.getSubRequestId()); - assertNull(testName, outcome.getOutcome()); + assertNotNull(testName, outcome.getResult()); assertNull(testName, outcome.getMessage()); } @@ -239,21 +213,23 @@ public class ControlLoopOperationParamsTest { @Test public void testValidateFields() { + testValidate("actor", "null", bldr -> bldr.actor(null)); testValidate("actorService", "null", bldr -> bldr.actorService(null)); testValidate("context", "null", bldr -> bldr.context(null)); testValidate("executor", "null", bldr -> bldr.executor(null)); - testValidate("policy", "null", bldr -> bldr.policy(null)); - testValidate("target", "null", bldr -> bldr.target(null)); + testValidate("operation", "null", bldr -> bldr.operation(null)); + testValidate("target", "null", bldr -> bldr.targetEntity(null)); // check edge cases assertTrue(params.toBuilder().build().validate().isValid()); // these can be null - assertTrue(params.toBuilder().startCallback(null).completeCallback(null).build().validate().isValid()); + assertTrue(params.toBuilder().payload(null).retry(null).target(null).timeoutSec(null).startCallback(null) + .completeCallback(null).build().validate().isValid()); // test with minimal fields - assertTrue(ControlLoopOperationParams.builder().actorService(actorService).context(context).policy(policy) - .target(TARGET).build().validate().isValid()); + assertTrue(ControlLoopOperationParams.builder().actorService(actorService).context(context).actor(ACTOR) + .operation(OPERATION).targetEntity(TARGET_ENTITY).build().validate().isValid()); } private void testValidate(String fieldName, String expected, @@ -275,7 +251,12 @@ public class ControlLoopOperationParamsTest { } @Test - public void testActorService() { + public void testGetActor() { + assertSame(ACTOR, params.getActor()); + } + + @Test + public void testGetActorService() { assertSame(actorService, params.getActorService()); } @@ -293,8 +274,43 @@ public class ControlLoopOperationParamsTest { } @Test - public void testGetPolicy() { - assertSame(policy, params.getPolicy()); + public void testGetOperation() { + assertSame(OPERATION, params.getOperation()); + } + + @Test + public void testGetPayload() { + assertSame(payload, params.getPayload()); + + // should be null when unspecified + assertNull(ControlLoopOperationParams.builder().build().getPayload()); + } + + @Test + public void testGetRetry() { + assertSame(RETRY, params.getRetry()); + + // should be null when unspecified + assertNull(ControlLoopOperationParams.builder().build().getRetry()); + } + + @Test + public void testTarget() { + assertSame(TARGET, params.getTarget()); + + // should be null when unspecified + assertNull(ControlLoopOperationParams.builder().build().getTarget()); + } + + @Test + public void testGetTimeoutSec() { + assertSame(TIMEOUT, params.getTimeoutSec()); + + // should be 300 when unspecified + assertEquals(Integer.valueOf(300), ControlLoopOperationParams.builder().build().getTimeoutSec()); + + // null should be ok too + assertNull(ControlLoopOperationParams.builder().timeoutSec(null).build().getTimeoutSec()); } @Test @@ -308,7 +324,7 @@ public class ControlLoopOperationParamsTest { } @Test - public void testGetTarget() { - assertEquals(TARGET, params.getTarget()); + public void testGetTargetEntity() { + assertEquals(TARGET_ENTITY, params.getTargetEntity()); } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParamsTest.java index 1763388f2..6c1f538ec 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParamsTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParamsTest.java @@ -90,6 +90,8 @@ public class HttpActorParamsTest { @Test public void testValidate() { + assertTrue(params.validate(CONTAINER).isValid()); + testValidateField("clientName", "null", params2 -> params2.setClientName(null)); testValidateField("path", "null", params2 -> params2.setPath(null)); testValidateField("timeoutSec", "minimum", params2 -> params2.setTimeoutSec(-1)); diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParamsTest.java index 829c480d1..6cf7328ca 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParamsTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParamsTest.java @@ -47,6 +47,8 @@ public class HttpParamsTest { @Test public void testValidate() { + assertTrue(params.validate(CONTAINER).isValid()); + testValidateField("clientName", "null", bldr -> bldr.clientName(null)); testValidateField("path", "null", bldr -> bldr.path(null)); testValidateField("timeoutSec", "minimum", bldr -> bldr.timeoutSec(-1)); diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFutureTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFutureTest.java index b421c1ce2..a6b11ef65 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFutureTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFutureTest.java @@ -23,21 +23,27 @@ package org.onap.policy.controlloop.actorserviceprovider.pipeline; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Function; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -58,9 +64,10 @@ public class PipelineControllerFutureTest { private Future<String> future2; @Mock - private CompletableFuture<String> compFuture; + private Executor executor; + private CompletableFuture<String> compFuture; private PipelineControllerFuture<String> controller; @@ -72,6 +79,8 @@ public class PipelineControllerFutureTest { public void setUp() { MockitoAnnotations.initMocks(this); + compFuture = spy(new CompletableFuture<>()); + controller = new PipelineControllerFuture<>(); controller.add(runnable1); @@ -89,10 +98,7 @@ public class PipelineControllerFutureTest { assertTrue(controller.isCancelled()); assertFalse(controller.isRunning()); - verify(runnable1).run(); - verify(runnable2).run(); - verify(future1).cancel(anyBoolean()); - verify(future2).cancel(anyBoolean()); + verifyStopped(); // re-invoke; nothing should change assertTrue(controller.cancel(true)); @@ -100,10 +106,155 @@ public class PipelineControllerFutureTest { assertTrue(controller.isCancelled()); assertFalse(controller.isRunning()); - verify(runnable1).run(); - verify(runnable2).run(); - verify(future1).cancel(anyBoolean()); - verify(future2).cancel(anyBoolean()); + verifyStopped(); + } + + @Test + public void testCompleteT() throws Exception { + assertTrue(controller.complete(TEXT)); + assertEquals(TEXT, controller.get()); + + verifyStopped(); + + // repeat - disallowed + assertFalse(controller.complete(TEXT)); + } + + @Test + public void testCompleteExceptionallyThrowable() { + assertTrue(controller.completeExceptionally(EXPECTED_EXCEPTION)); + assertThatThrownBy(() -> controller.get()).hasCause(EXPECTED_EXCEPTION); + + verifyStopped(); + + // repeat - disallowed + assertFalse(controller.completeExceptionally(EXPECTED_EXCEPTION)); + } + + @Test + public void testCompleteAsyncSupplierOfQextendsTExecutor() throws Exception { + CompletableFuture<String> future = controller.completeAsync(() -> TEXT, executor); + + // haven't stopped anything yet + assertFalse(future.isDone()); + verify(runnable1, never()).run(); + + // get the operation and run it + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(executor).execute(captor.capture()); + captor.getValue().run(); + + // should be done now + assertTrue(future.isDone()); + + assertEquals(TEXT, future.get()); + + verifyStopped(); + } + + /** + * Tests completeAsync(executor) when canceled before execution. + */ + @Test + public void testCompleteAsyncSupplierOfQextendsTExecutorCanceled() throws Exception { + CompletableFuture<String> future = controller.completeAsync(() -> TEXT, executor); + + assertTrue(future.cancel(false)); + + verifyStopped(); + + assertTrue(future.isDone()); + + assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class); + } + + @Test + public void testCompleteAsyncSupplierOfQextendsT() throws Exception { + CompletableFuture<String> future = controller.completeAsync(() -> TEXT); + assertEquals(TEXT, future.get()); + + verifyStopped(); + } + + /** + * Tests completeAsync() when canceled. + */ + @Test + public void testCompleteAsyncSupplierOfQextendsTCanceled() throws Exception { + CountDownLatch canceled = new CountDownLatch(1); + + // run async, but await until canceled + CompletableFuture<String> future = controller.completeAsync(() -> { + try { + canceled.await(); + } catch (InterruptedException e) { + // do nothing + } + + return TEXT; + }); + + assertTrue(future.cancel(false)); + + // let the future run now + canceled.countDown(); + + verifyStopped(); + + assertTrue(future.isDone()); + + assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class); + } + + @Test + public void testCompleteOnTimeoutTLongTimeUnit() throws Exception { + CountDownLatch stopped = new CountDownLatch(1); + controller.add(() -> stopped.countDown()); + + CompletableFuture<String> future = controller.completeOnTimeout(TEXT, 1, TimeUnit.MILLISECONDS); + + assertEquals(TEXT, future.get()); + + /* + * Must use latch instead of verifyStopped(), because the runnables may be + * executed asynchronously. + */ + assertTrue(stopped.await(5, TimeUnit.SECONDS)); + } + + /** + * Tests completeOnTimeout() when completed before the timeout. + */ + @Test + public void testCompleteOnTimeoutTLongTimeUnitNoTimeout() throws Exception { + CompletableFuture<String> future = controller.completeOnTimeout("timed out", 5, TimeUnit.SECONDS); + controller.complete(TEXT); + + assertEquals(TEXT, future.get()); + + verifyStopped(); + } + + /** + * Tests completeOnTimeout() when canceled before the timeout. + */ + @Test + public void testCompleteOnTimeoutTLongTimeUnitCanceled() { + CompletableFuture<String> future = controller.completeOnTimeout(TEXT, 5, TimeUnit.SECONDS); + assertTrue(future.cancel(true)); + + assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class); + + verifyStopped(); + } + + @Test + public void testNewIncompleteFuture() { + PipelineControllerFuture<String> future = controller.newIncompleteFuture(); + assertNotNull(future); + assertTrue(future instanceof PipelineControllerFuture); + assertNotSame(controller, future); + assertFalse(future.isDone()); } @Test @@ -208,22 +359,81 @@ public class PipelineControllerFutureTest { verify(future2).cancel(anyBoolean()); } + /** + * Tests both wrap() methods. + */ @Test - public void testAddFunction() { - AtomicReference<String> value = new AtomicReference<>(); + public void testWrap() throws Exception { + controller = spy(controller); - Function<String, CompletableFuture<String>> func = controller.add(input -> { - value.set(input); + CompletableFuture<String> future = controller.wrap(compFuture); + verify(controller, never()).remove(compFuture); + + compFuture.complete(TEXT); + assertEquals(TEXT, future.get()); + + verify(controller).remove(compFuture); + } + + /** + * Tests wrap(), when the controller is not running. + */ + @Test + public void testWrapNotRunning() throws Exception { + controller.cancel(false); + controller = spy(controller); + + assertFalse(controller.wrap(compFuture).isDone()); + verify(controller, never()).add(compFuture); + verify(controller, never()).remove(compFuture); + + verify(compFuture).cancel(anyBoolean()); + } + + /** + * Tests wrap(), when the future throws an exception. + */ + @Test + public void testWrapException() throws Exception { + controller = spy(controller); + + CompletableFuture<String> future = controller.wrap(compFuture); + verify(controller, never()).remove(compFuture); + + compFuture.completeExceptionally(EXPECTED_EXCEPTION); + assertThatThrownBy(() -> future.get()).hasCause(EXPECTED_EXCEPTION); + + verify(controller).remove(compFuture); + } + + @Test + public void testWrapFunction() throws Exception { + + Function<String, CompletableFuture<String>> func = controller.wrap(input -> { + compFuture.complete(input); return compFuture; }); - assertSame(compFuture, func.apply(TEXT)); - assertEquals(TEXT, value.get()); + CompletableFuture<String> future = func.apply(TEXT); + assertTrue(compFuture.isDone()); - verify(compFuture, never()).cancel(anyBoolean()); + assertEquals(TEXT, future.get()); // should not have completed the controller assertFalse(controller.isDone()); + } + + /** + * Tests add(Function) when the controller is canceled after the future is added. + */ + @Test + public void testWrapFunctionCancel() throws Exception { + Function<String, CompletableFuture<String>> func = controller.wrap(input -> compFuture); + + CompletableFuture<String> future = func.apply(TEXT); + assertFalse(future.isDone()); + + assertFalse(compFuture.isDone()); // cancel - should propagate controller.cancel(false); @@ -235,10 +445,10 @@ public class PipelineControllerFutureTest { * Tests add(Function) when the controller is not running. */ @Test - public void testAddFunctionNotRunning() { + public void testWrapFunctionNotRunning() { AtomicReference<String> value = new AtomicReference<>(); - Function<String, CompletableFuture<String>> func = controller.add(input -> { + Function<String, CompletableFuture<String>> func = controller.wrap(input -> { value.set(input); return compFuture; }); @@ -251,4 +461,11 @@ public class PipelineControllerFutureTest { assertNull(value.get()); } + + private void verifyStopped() { + verify(runnable1).run(); + verify(runnable2).run(); + verify(future1).cancel(anyBoolean()); + verify(future2).cancel(anyBoolean()); + } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml b/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml index f8a1e5112..c7fe46e47 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml +++ b/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml @@ -35,8 +35,8 @@ <appender-ref ref="STDOUT" /> </root> - <!-- this is just an example --> - <logger name="ch.qos.logback.classic" level="off" additivity="false"> + <!-- this is required for UtilTest --> + <logger name="org.onap.policy.controlloop.actorserviceprovider.Util" level="info" additivity="false"> <appender-ref ref="STDOUT" /> </logger> </configuration> |