diff options
author | Jim Hahn <jrh3@att.com> | 2020-02-06 21:48:12 -0500 |
---|---|---|
committer | Jim Hahn <jrh3@att.com> | 2020-02-07 13:21:52 -0500 |
commit | e06578535f6afadac715c04ed03c74c05a075780 (patch) | |
tree | 1c9a0141daf15b93cb4f6452703d92cf8a6d751c /models-interactions/model-actors/actorServiceProvider | |
parent | accad88260f99c1b5c5329285b73aa84349e623b (diff) |
Clean up and enhancement of Actor re-design
Added junits for the remaining code.
Enhancements to facilitate implementation of Operators:
- Added allOf(), anyOf() facilities
- Added AsyncResponseHandler for handling asynchronous I/O via the
HttpClient
- Added logRestRequest() and logRestResponse() for logging REST
requests and responses
- Added HttpActor and HttpOperator, which can be used as superclasses
- Added doTask()
- Lifted data from the event into ControlLoopEventContext
Updates per previous review comments:
- Changed logException() to runFunction().
- Removed the aaiCqResponse field.
- Lifted fields from Policy into ControlLoopOperationParams, eliminating
the need to include Policy in the class.
OperatorPartial depends on the string values in the ControlLoopOperation
being set to one of the string values of PolicyResult. Instead of
passing ControlLoopOperation around, the operators should pass around
an object that uses PolicyResult directly, rather than depending on
the string values being set correctly. Created OperationOutcome for
this purpose.
Stop pipeline when the controller completes.
Use whenComplete() where appropriate.
startOperationAsync() should not block. Modified it to launch the task
in the background via its own thread.
Extracted CallbackManager into its own file.
Replaced actor setOperators() with addOperator()
Renamed add() to wrap(), and modified it to remove the future when it
completes.
Fixed the signature on delayedRemove() and delayedComplete().
Replaced xxxAsync() calls with just xxx() calls, where appropriate to
avoid the extra overhead of submitting it to a work queue.
Renamed handleFailure() to handlePreprocessorFailure().
Updates per WIP review comments
Issue-ID: POLICY-1625
Signed-off-by: Jim Hahn <jrh3@att.com>
Change-Id: Id4c4c7ade979bdb76cc54266837609cc69a22c58
Diffstat (limited to 'models-interactions/model-actors/actorServiceProvider')
28 files changed, 2727 insertions, 720 deletions
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java index 13f09b1ad..2886b1feb 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java @@ -63,7 +63,6 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> { return newActor; } - // TODO: should this throw an exception? logger.warn("duplicate actor names for {}: {}, ignoring {}", name, existingActor.getClass().getSimpleName(), newActor.getClass().getSimpleName()); return existingActor; @@ -158,7 +157,7 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> { for (Actor actor : name2actor.values()) { if (actor.isConfigured()) { - Util.logException(actor::start, "failed to start actor {}", actor.getName()); + Util.runFunction(actor::start, "failed to start actor {}", actor.getName()); } else { logger.warn("not starting unconfigured actor {}", actor.getName()); @@ -170,7 +169,7 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> { protected void doStop() { logger.info("stopping actors"); name2actor.values() - .forEach(actor -> Util.logException(actor::stop, "failed to stop actor {}", actor.getName())); + .forEach(actor -> Util.runFunction(actor::stop, "failed to stop actor {}", actor.getName())); } @Override @@ -179,7 +178,7 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> { // @formatter:off name2actor.values().forEach( - actor -> Util.logException(actor::shutdown, "failed to shutdown actor {}", actor.getName())); + actor -> Util.runFunction(actor::shutdown, "failed to shutdown actor {}", actor.getName())); // @formatter:on } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java new file mode 100644 index 000000000..d78403809 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java @@ -0,0 +1,119 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import javax.ws.rs.client.InvocationCallback; +import lombok.AccessLevel; +import lombok.Getter; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handler for a <i>single</i> asynchronous response. + * + * @param <T> response type + */ +@Getter +public abstract class AsyncResponseHandler<T> implements InvocationCallback<T> { + + private static final Logger logger = LoggerFactory.getLogger(AsyncResponseHandler.class); + + @Getter(AccessLevel.NONE) + private final PipelineControllerFuture<OperationOutcome> result = new PipelineControllerFuture<>(); + private final ControlLoopOperationParams params; + private final OperationOutcome outcome; + + /** + * Constructs the object. + * + * @param params operation parameters + * @param outcome outcome to be populated based on the response + */ + public AsyncResponseHandler(ControlLoopOperationParams params, OperationOutcome outcome) { + this.params = params; + this.outcome = outcome; + } + + /** + * Handles the given future, arranging to cancel it when the response is received. + * + * @param future future to be handled + * @return a future to be used to cancel or wait for the response + */ + public CompletableFuture<OperationOutcome> handle(Future<T> future) { + result.add(future); + return result; + } + + /** + * Invokes {@link #doComplete()} and then completes "this" with the returned value. + */ + @Override + public void completed(T rawResponse) { + try { + logger.trace("{}.{}: response completed for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + result.complete(doComplete(rawResponse)); + + } catch (RuntimeException e) { + logger.trace("{}.{}: response handler threw an exception for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + result.completeExceptionally(e); + } + } + + /** + * Invokes {@link #doFailed()} and then completes "this" with the returned value. + */ + @Override + public void failed(Throwable throwable) { + try { + logger.trace("{}.{}: response failure for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + result.complete(doFailed(throwable)); + + } catch (RuntimeException e) { + logger.trace("{}.{}: response failure handler threw an exception for {}", params.getActor(), + params.getOperation(), params.getRequestId()); + result.completeExceptionally(e); + } + } + + /** + * Completes the processing of a response. + * + * @param rawResponse raw response that was received + * @return the outcome + */ + protected abstract OperationOutcome doComplete(T rawResponse); + + /** + * Handles a response exception. + * + * @param thrown exception that was thrown + * @return the outcome + */ + protected abstract OperationOutcome doFailed(Throwable thrown); +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManager.java new file mode 100644 index 000000000..7d7c1d902 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManager.java @@ -0,0 +1,84 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider; + +import java.time.Instant; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Manager for "start" and "end" callbacks. + */ +public class CallbackManager implements Runnable { + private final AtomicReference<Instant> startTime = new AtomicReference<>(); + private final AtomicReference<Instant> endTime = new AtomicReference<>(); + + /** + * Determines if the "start" callback can be invoked. If so, it sets the + * {@link #startTime} to the current time. + * + * @return {@code true} if the "start" callback can be invoked, {@code false} + * otherwise + */ + public boolean canStart() { + return startTime.compareAndSet(null, Instant.now()); + } + + /** + * Determines if the "end" callback can be invoked. If so, it sets the + * {@link #endTime} to the current time. + * + * @return {@code true} if the "end" callback can be invoked, {@code false} + * otherwise + */ + public boolean canEnd() { + return endTime.compareAndSet(null, Instant.now()); + } + + /** + * Gets the start time. + * + * @return the start time, or {@code null} if {@link #canStart()} has not been + * invoked yet. + */ + public Instant getStartTime() { + return startTime.get(); + } + + /** + * Gets the end time. + * + * @return the end time, or {@code null} if {@link #canEnd()} has not been invoked + * yet. + */ + public Instant getEndTime() { + return endTime.get(); + } + + /** + * Prevents further callbacks from being executed by setting {@link #startTime} + * and {@link #endTime}. + */ + @Override + public void run() { + canStart(); + canEnd(); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcome.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcome.java new file mode 100644 index 000000000..6b0924807 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcome.java @@ -0,0 +1,116 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider; + +import java.time.Instant; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.policy.PolicyResult; + +/** + * Outcome from an operation. Objects of this type are passed from one stage to the next. + */ +@Data +@NoArgsConstructor +public class OperationOutcome { + private String actor; + private String operation; + private String target; + private Instant start; + private Instant end; + private String subRequestId; + private PolicyResult result = PolicyResult.SUCCESS; + private String message; + + /** + * Copy constructor. + * + * @param source source object from which to copy + */ + public OperationOutcome(OperationOutcome source) { + this.actor = source.actor; + this.operation = source.operation; + this.target = source.target; + this.start = source.start; + this.end = source.end; + this.subRequestId = source.subRequestId; + this.result = source.result; + this.message = source.message; + } + + /** + * Creates a {@link ControlLoopOperation}, populating all fields with the values from + * this object. Sets the outcome field to the string representation of this object's + * outcome. + * + * @return + */ + public ControlLoopOperation toControlLoopOperation() { + ControlLoopOperation clo = new ControlLoopOperation(); + + clo.setActor(actor); + clo.setOperation(operation); + clo.setTarget(target); + clo.setStart(start); + clo.setEnd(end); + clo.setSubRequestId(subRequestId); + clo.setOutcome(result.toString()); + clo.setMessage(message); + + return clo; + } + + /** + * Determines if this outcome is for the given actor and operation. + * + * @param actor actor name + * @param operation operation name + * @return {@code true} if this outcome is for the given actor and operation + */ + public boolean isFor(@NonNull String actor, @NonNull String operation) { + // do the operation check first, as it's most likely to be unique + return (operation.equals(this.operation) && actor.equals(this.actor)); + } + + /** + * Determines if an outcome is for the given actor and operation. + * + * @param outcome outcome to be examined, or {@code null} + * @param actor actor name + * @param operation operation name + * @return {@code true} if this outcome is for the given actor and operation, + * {@code false} it is {@code null} or not for the actor/operation + */ + public static boolean isFor(OperationOutcome outcome, String actor, String operation) { + return (outcome != null && outcome.isFor(actor, operation)); + } + + /** + * Sets the result. + * + * @param result new result + */ + public void setResult(@NonNull PolicyResult result) { + this.result = result; + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java index e308ee42e..c09460e34 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import org.onap.policy.common.capabilities.Configurable; import org.onap.policy.common.capabilities.Startable; -import org.onap.policy.controlloop.ControlLoopOperation; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; /** @@ -54,5 +53,5 @@ public interface Operator extends Startable, Configurable<Map<String, Object>> { * @param params parameters needed to start the operation * @return a future that can be used to cancel or await the result of the operation */ - CompletableFuture<ControlLoopOperation> startOperation(ControlLoopOperationParams params); + CompletableFuture<OperationOutcome> startOperation(ControlLoopOperationParams params); } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java index 0aba1a7fa..c3ddd17f3 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java @@ -23,6 +23,9 @@ package org.onap.policy.controlloop.actorserviceprovider; import java.util.Arrays; import java.util.LinkedHashMap; import java.util.Map; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; import org.onap.policy.common.utils.coder.Coder; import org.onap.policy.common.utils.coder.CoderException; import org.onap.policy.common.utils.coder.StandardCoder; @@ -53,6 +56,82 @@ public class Util { } /** + * Logs a REST request. If the request is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param url request URL + * @param request request to be logged + */ + public static <T> void logRestRequest(String url, T request) { + logRestRequest(new StandardCoder(), url, request); + } + + /** + * Logs a REST request. If the request is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param coder coder to be used to pretty-print the request + * @param url request URL + * @param request request to be logged + */ + protected static <T> void logRestRequest(Coder coder, String url, T request) { + String json; + try { + if (request instanceof String) { + json = request.toString(); + } else { + json = coder.encode(request, true); + } + + } catch (CoderException e) { + logger.warn("cannot pretty-print request", e); + json = request.toString(); + } + + NetLoggerUtil.log(EventType.OUT, CommInfrastructure.REST, url, json); + logger.info("[OUT|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json); + } + + /** + * Logs a REST response. If the response is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param url request URL + * @param response response to be logged + */ + public static <T> void logRestResponse(String url, T response) { + logRestResponse(new StandardCoder(), url, response); + } + + /** + * Logs a REST response. If the request is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param coder coder to be used to pretty-print the response + * @param url request URL + * @param response response to be logged + */ + protected static <T> void logRestResponse(Coder coder, String url, T response) { + String json; + try { + if (response == null) { + json = null; + } else if (response instanceof String) { + json = response.toString(); + } else { + json = coder.encode(response, true); + } + + } catch (CoderException e) { + logger.warn("cannot pretty-print response", e); + json = response.toString(); + } + + NetLoggerUtil.log(EventType.IN, CommInfrastructure.REST, url, json); + logger.info("[IN|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json); + } + + /** * Runs a function and logs a message if it throws an exception. Does <i>not</i> * re-throw the exception. * @@ -60,7 +139,7 @@ public class Util { * @param exceptionMessage message to log if an exception is thrown * @param exceptionArgs arguments to be passed to the logger */ - public static void logException(Runnable function, String exceptionMessage, Object... exceptionArgs) { + public static void runFunction(Runnable function, String exceptionMessage, Object... exceptionArgs) { try { function.run(); diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java index 68bbe7edc..cd4d2570f 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java @@ -22,11 +22,12 @@ package org.onap.policy.controlloop.actorserviceprovider.controlloop; import java.io.Serializable; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import lombok.AccessLevel; import lombok.Getter; +import lombok.NonNull; import lombok.Setter; -import org.onap.policy.aai.AaiCqResponse; import org.onap.policy.controlloop.VirtualControlLoopEvent; /** @@ -37,23 +38,35 @@ import org.onap.policy.controlloop.VirtualControlLoopEvent; public class ControlLoopEventContext implements Serializable { private static final long serialVersionUID = 1L; - @Setter(AccessLevel.NONE) + private final VirtualControlLoopEvent event; - private AaiCqResponse aaiCqResponse; + /** + * Enrichment data extracted from the event. Never {@code null}, though it may be + * immutable. + */ + private final Map<String, String> enrichment; - // TODO may remove this if it proves not to be needed @Getter(AccessLevel.NONE) @Setter(AccessLevel.NONE) private Map<String, Serializable> properties = new ConcurrentHashMap<>(); /** + * Request ID extracted from the event, or a generated value if the event has no + * request id; never {@code null}. + */ + private final UUID requestId; + + + /** * Constructs the object. * * @param event event with which this is associated */ - public ControlLoopEventContext(VirtualControlLoopEvent event) { + public ControlLoopEventContext(@NonNull VirtualControlLoopEvent event) { this.event = event; + this.requestId = (event.getRequestId() != null ? event.getRequestId() : UUID.randomUUID()); + this.enrichment = (event.getAai() != null ? event.getAai() : Map.of()); } /** diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java index 9b9aa914e..d7f322e8a 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java @@ -20,14 +20,12 @@ package org.onap.policy.controlloop.actorserviceprovider.impl; -import com.google.common.collect.ImmutableMap; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import org.onap.policy.common.parameters.BeanValidationResult; import org.onap.policy.controlloop.actorserviceprovider.Operator; @@ -46,44 +44,42 @@ public class ActorImpl extends StartConfigPartial<Map<String, Object>> implement /** * Maps a name to an operator. */ - private Map<String, Operator> name2operator; + private final Map<String, Operator> name2operator = new ConcurrentHashMap<>(); /** * Constructs the object. * * @param name actor name - * @param operators the operations supported by this actor */ - public ActorImpl(String name, Operator... operators) { + public ActorImpl(String name) { super(name); - setOperators(Arrays.asList(operators)); } /** - * Sets the operators supported by this actor, overriding any previous list. + * Adds an operator supported by this actor. * - * @param operators the operations supported by this actor + * @param operator operation to be added */ - protected void setOperators(List<Operator> operators) { + protected synchronized void addOperator(Operator operator) { + /* + * This method is "synchronized" to prevent the state from changing while the + * operator is added. The map, itself, does not need synchronization as it's a + * concurrent map. + */ + if (isConfigured()) { throw new IllegalStateException("attempt to set operators on a configured actor: " + getName()); } - Map<String, Operator> map = new HashMap<>(); - for (Operator newOp : operators) { - map.compute(newOp.getName(), (opName, existingOp) -> { - if (existingOp == null) { - return newOp; - } - - // TODO: should this throw an exception? - logger.warn("duplicate names for actor operation {}.{}: {}, ignoring {}", getName(), opName, - existingOp.getClass().getSimpleName(), newOp.getClass().getSimpleName()); - return existingOp; - }); - } + name2operator.compute(operator.getName(), (opName, existingOp) -> { + if (existingOp == null) { + return operator; + } - this.name2operator = ImmutableMap.copyOf(map); + logger.warn("duplicate names for actor operation {}.{}: {}, ignoring {}", getName(), opName, + existingOp.getClass().getSimpleName(), operator.getClass().getSimpleName()); + return existingOp; + }); } @Override @@ -177,7 +173,7 @@ public class ActorImpl extends StartConfigPartial<Map<String, Object>> implement for (Operator oper : name2operator.values()) { if (oper.isConfigured()) { - Util.logException(oper::start, "failed to start operation {}.{}", actorName, oper.getName()); + Util.runFunction(oper::start, "failed to start operation {}.{}", actorName, oper.getName()); } else { logger.warn("not starting unconfigured operation {}.{}", actorName, oper.getName()); @@ -195,7 +191,7 @@ public class ActorImpl extends StartConfigPartial<Map<String, Object>> implement // @formatter:off name2operator.values().forEach( - oper -> Util.logException(oper::stop, "failed to stop operation {}.{}", actorName, oper.getName())); + oper -> Util.runFunction(oper::stop, "failed to stop operation {}.{}", actorName, oper.getName())); // @formatter:on } @@ -208,7 +204,7 @@ public class ActorImpl extends StartConfigPartial<Map<String, Object>> implement logger.info("shutting down operations for actor {}", actorName); // @formatter:off - name2operator.values().forEach(oper -> Util.logException(oper::shutdown, + name2operator.values().forEach(oper -> Util.runFunction(oper::shutdown, "failed to shutdown operation {}.{}", actorName, oper.getName())); // @formatter:on } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActor.java new file mode 100644 index 000000000..28b7b3924 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActor.java @@ -0,0 +1,58 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.util.Map; +import java.util.function.Function; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpActorParams; + +/** + * Actor that uses HTTP, where the only additional property that an operator needs is a + * URL. The actor's parameters must be an {@link HttpActorParams} and its operator + * parameters are expected to be an {@link HttpParams}. + */ +public class HttpActor extends ActorImpl { + + /** + * Constructs the object. + * + * @param name actor's name + */ + public HttpActor(String name) { + super(name); + } + + /** + * Translates the parameters to an {@link HttpActorParams} and then creates a function + * that will extract operator-specific parameters. + */ + @Override + protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) { + String actorName = getName(); + + // @formatter:off + return Util.translate(actorName, actorParameters, HttpActorParams.class) + .doValidation(actorName) + .makeOperationParameters(actorName); + // @formatter:on + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java new file mode 100644 index 000000000..566492907 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java @@ -0,0 +1,84 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.util.Map; +import lombok.AccessLevel; +import lombok.Getter; +import org.onap.policy.common.endpoints.http.client.HttpClient; +import org.onap.policy.common.endpoints.http.client.HttpClientFactory; +import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; + +/** + * Operator that uses HTTP. The operator's parameters must be a {@link HttpParams}. + */ +public class HttpOperator extends OperatorPartial { + + @Getter(AccessLevel.PROTECTED) + private HttpClient client; + + @Getter + private long timeoutSec; + + /** + * URI path for this particular operation. + */ + @Getter + private String path; + + + /** + * Constructs the object. + * + * @param actorName name of the actor with which this operator is associated + * @param name operation name + */ + public HttpOperator(String actorName, String name) { + super(actorName, name); + } + + /** + * Translates the parameters to an {@link HttpParams} and then extracts the relevant + * values. + */ + @Override + protected void doConfigure(Map<String, Object> parameters) { + HttpParams params = Util.translate(getFullName(), parameters, HttpParams.class); + ValidationResult result = params.validate(getFullName()); + if (!result.isValid()) { + throw new ParameterValidationRuntimeException("invalid parameters", result); + } + + client = getClientFactory().get(params.getClientName()); + path = params.getPath(); + timeoutSec = params.getTimeoutSec(); + } + + // these may be overridden by junits + + protected HttpClientFactory getClientFactory() { + return HttpClientFactoryInstance.getClientFactory(); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java index 80d8fbd04..df5258d71 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java @@ -20,37 +20,61 @@ package org.onap.policy.controlloop.actorserviceprovider.impl; -import java.time.Instant; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.Function; +import lombok.AccessLevel; import lombok.Getter; +import lombok.Setter; import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.actorserviceprovider.CallbackManager; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; import org.onap.policy.controlloop.actorserviceprovider.Operator; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; -import org.onap.policy.controlloop.policy.Policy; import org.onap.policy.controlloop.policy.PolicyResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Partial implementation of an operator. Subclasses can choose to simply implement - * {@link #doOperation(ControlLoopOperationParams)}, or they may choose to override - * {@link #doOperationAsFuture(ControlLoopOperationParams)}. + * Partial implementation of an operator. In general, it's preferable that subclasses + * would override + * {@link #startOperationAsync(ControlLoopOperationParams, int, OperationOutcome) + * startOperationAsync()}. However, if that proves to be too difficult, then they can + * simply override {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome) + * doOperation()}. In addition, if the operation requires any preprocessor steps, the + * subclass may choose to override + * {@link #startPreprocessorAsync(ControlLoopOperationParams) startPreprocessorAsync()}. + * <p/> + * The futures returned by the methods within this class can be canceled, and will + * propagate the cancellation to any subtasks. Thus it is also expected that any futures + * returned by overridden methods will do the same. Of course, if a class overrides + * {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome) doOperation()}, + * then there's little that can be done to cancel that particular operation. */ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Object>> implements Operator { private static final Logger logger = LoggerFactory.getLogger(OperatorPartial.class); - private static final String OUTCOME_SUCCESS = PolicyResult.SUCCESS.toString(); - private static final String OUTCOME_FAILURE = PolicyResult.FAILURE.toString(); - private static final String OUTCOME_RETRIES = PolicyResult.FAILURE_RETRIES.toString(); + /** + * Executor to be used for tasks that may perform blocking I/O. The default executor + * simply launches a new thread for each command that is submitted to it. + * <p/> + * May be overridden by junit tests. + */ + @Getter(AccessLevel.PROTECTED) + @Setter(AccessLevel.PROTECTED) + private Executor blockingExecutor = command -> { + Thread thread = new Thread(command); + thread.setDaemon(true); + thread.start(); + }; @Getter private final String actorName; @@ -103,94 +127,52 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj } @Override - public final CompletableFuture<ControlLoopOperation> startOperation(ControlLoopOperationParams params) { + public final CompletableFuture<OperationOutcome> startOperation(ControlLoopOperationParams params) { if (!isAlive()) { throw new IllegalStateException("operation is not running: " + getFullName()); } - final Executor executor = params.getExecutor(); - // allocate a controller for the entire operation - final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>(); + final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - CompletableFuture<ControlLoopOperation> preproc = startPreprocessor(params); + CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync(params); if (preproc == null) { // no preprocessor required - just start the operation return startOperationAttempt(params, controller, 1); } - // propagate "stop" to the preprocessor - controller.add(preproc); - /* * Do preprocessor first and then, if successful, start the operation. Note: * operations create their own outcome, ignoring the outcome from any previous * steps. + * + * Wrap the preprocessor to ensure "stop" is propagated to it. */ - preproc.whenCompleteAsync(controller.delayedRemove(preproc), executor) - .thenComposeAsync(handleFailure(params, controller), executor) - .thenComposeAsync(onSuccess(params, unused -> startOperationAttempt(params, controller, 1)), - executor); - - return controller; - } - - /** - * Starts an operation's preprocessor step(s). If the preprocessor fails, then it - * invokes the started and completed call-backs. - * - * @param params operation parameters - * @return a future that will return the preprocessor outcome, or {@code null} if this - * operation needs no preprocessor - */ - protected CompletableFuture<ControlLoopOperation> startPreprocessor(ControlLoopOperationParams params) { - logger.info("{}: start low-level operation preprocessor for {}", getFullName(), params.getRequestId()); - - final Executor executor = params.getExecutor(); - final ControlLoopOperation operation = params.makeOutcome(); - - final Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preproc = - doPreprocessorAsFuture(params); - if (preproc == null) { - // no preprocessor required - return null; - } - - // allocate a controller for the preprocessor steps - final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>(); - - /* - * Don't mark it complete until we've built the whole pipeline. This will prevent - * the operation from starting until after it has been successfully built (i.e., - * without generating any exceptions). - */ - final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>(); - // @formatter:off - firstFuture - .thenComposeAsync(controller.add(preproc), executor) - .exceptionally(fromException(params, operation)) - .whenCompleteAsync(controller.delayedComplete(), executor); + controller.wrap(preproc) + .exceptionally(fromException(params, "preprocessor of operation")) + .thenCompose(handlePreprocessorFailure(params, controller)) + .thenCompose(unusedOutcome -> startOperationAttempt(params, controller, 1)); // @formatter:on - // start the pipeline - firstFuture.complete(operation); - return controller; } /** * Handles a failure in the preprocessor pipeline. If a failure occurred, then it - * invokes the call-backs and returns a failed outcome. Otherwise, it returns the - * outcome that it received. + * invokes the call-backs, marks the controller complete, and returns an incomplete + * future, effectively halting the pipeline. Otherwise, it returns the outcome that it + * received. + * <p/> + * Assumes that no callbacks have been invoked yet. * * @param params operation parameters * @param controller pipeline controller * @return a function that checks the outcome status and continues, if successful, or * indicates a failure otherwise */ - private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> handleFailure( - ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller) { + private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure( + ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller) { return outcome -> { @@ -207,19 +189,21 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj // propagate "stop" to the callbacks controller.add(callbacks); - final ControlLoopOperation outcome2 = params.makeOutcome(); + final OperationOutcome outcome2 = params.makeOutcome(); // TODO need a FAILURE_MISSING_DATA (e.g., A&AI) - outcome2.setOutcome(PolicyResult.FAILURE_GUARD.toString()); + outcome2.setResult(PolicyResult.FAILURE_GUARD); outcome2.setMessage(outcome != null ? outcome.getMessage() : null); - CompletableFuture.completedFuture(outcome2).thenApplyAsync(callbackStarted(params, callbacks), executor) - .thenApplyAsync(callbackCompleted(params, callbacks), executor) - .whenCompleteAsync(controller.delayedRemove(callbacks), executor) + // @formatter:off + CompletableFuture.completedFuture(outcome2) + .whenCompleteAsync(callbackStarted(params, callbacks), executor) + .whenCompleteAsync(callbackCompleted(params, callbacks), executor) .whenCompleteAsync(controller.delayedComplete(), executor); + // @formatter:on - return controller; + return new CompletableFuture<>(); }; } @@ -237,8 +221,7 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @return a function that will start the preprocessor and returns its outcome, or * {@code null} if this operation needs no preprocessor */ - protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture( - ControlLoopOperationParams params) { + protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) { return null; } @@ -251,20 +234,12 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param attempt attempt number, typically starting with 1 * @return a future that will return the final result of all attempts */ - private CompletableFuture<ControlLoopOperation> startOperationAttempt(ControlLoopOperationParams params, - PipelineControllerFuture<ControlLoopOperation> controller, int attempt) { - - final Executor executor = params.getExecutor(); - - CompletableFuture<ControlLoopOperation> future = startAttemptWithoutRetries(params, attempt); + private CompletableFuture<OperationOutcome> startOperationAttempt(ControlLoopOperationParams params, + PipelineControllerFuture<OperationOutcome> controller, int attempt) { // propagate "stop" to the operation attempt - controller.add(future); - - // detach when complete - future.whenCompleteAsync(controller.delayedRemove(future), executor) - .thenComposeAsync(retryOnFailure(params, controller, attempt), params.getExecutor()) - .whenCompleteAsync(controller.delayedComplete(), executor); + controller.wrap(startAttemptWithoutRetries(params, attempt)) + .thenCompose(retryOnFailure(params, controller, attempt)); return controller; } @@ -276,40 +251,32 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param attempt attempt number, typically starting with 1 * @return a future that will return the result of a single operation attempt */ - private CompletableFuture<ControlLoopOperation> startAttemptWithoutRetries(ControlLoopOperationParams params, + private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(ControlLoopOperationParams params, int attempt) { logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId()); final Executor executor = params.getExecutor(); - final ControlLoopOperation outcome = params.makeOutcome(); + final OperationOutcome outcome = params.makeOutcome(); final CallbackManager callbacks = new CallbackManager(); // this operation attempt gets its own controller - final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>(); + final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); // propagate "stop" to the callbacks controller.add(callbacks); - /* - * Don't mark it complete until we've built the whole pipeline. This will prevent - * the operation from starting until after it has been successfully built (i.e., - * without generating any exceptions). - */ - final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>(); - // @formatter:off - CompletableFuture<ControlLoopOperation> future2 = - firstFuture.thenComposeAsync(verifyRunning(controller, params), executor) - .thenApplyAsync(callbackStarted(params, callbacks), executor) - .thenComposeAsync(controller.add(doOperationAsFuture(params, attempt)), executor); + CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome) + .whenCompleteAsync(callbackStarted(params, callbacks), executor) + .thenCompose(controller.wrap(outcome2 -> startOperationAsync(params, attempt, outcome2))); // @formatter:on // handle timeouts, if specified - long timeoutMillis = getTimeOutMillis(params.getPolicy()); + long timeoutMillis = getTimeOutMillis(params.getTimeoutSec()); if (timeoutMillis > 0) { logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId()); - future2 = future2.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS); + future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS); } /* @@ -321,16 +288,13 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj */ // @formatter:off - future2.exceptionally(fromException(params, outcome)) - .thenApplyAsync(setRetryFlag(params, attempt), executor) - .thenApplyAsync(callbackStarted(params, callbacks), executor) - .thenApplyAsync(callbackCompleted(params, callbacks), executor) + future.exceptionally(fromException(params, "operation")) + .thenApply(setRetryFlag(params, attempt)) + .whenCompleteAsync(callbackStarted(params, callbacks), executor) + .whenCompleteAsync(callbackCompleted(params, callbacks), executor) .whenCompleteAsync(controller.delayedComplete(), executor); // @formatter:on - // start the pipeline - firstFuture.complete(outcome); - return controller; } @@ -340,8 +304,8 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param outcome outcome to examine * @return {@code true} if the outcome was successful */ - protected boolean isSuccess(ControlLoopOperation outcome) { - return OUTCOME_SUCCESS.equals(outcome.getOutcome()); + protected boolean isSuccess(OperationOutcome outcome) { + return (outcome.getResult() == PolicyResult.SUCCESS); } /** @@ -351,13 +315,29 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @return {@code true} if the outcome is not {@code null} and was a failure * <i>and</i> was associated with this operator, {@code false} otherwise */ - protected boolean isActorFailed(ControlLoopOperation outcome) { - return OUTCOME_FAILURE.equals(getActorOutcome(outcome)); + protected boolean isActorFailed(OperationOutcome outcome) { + return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE); + } + + /** + * Determines if the given outcome is for this operation. + * + * @param outcome outcome to examine + * @return {@code true} if the outcome is for this operation, {@code false} otherwise + */ + protected boolean isSameOperation(OperationOutcome outcome) { + return OperationOutcome.isFor(outcome, getActorName(), getName()); } /** * Invokes the operation as a "future". This method simply invokes - * {@link #doOperation(ControlLoopOperationParams)} turning it into a "future". + * {@link #doOperation(ControlLoopOperationParams)} using the {@link #blockingExecutor + * "blocking executor"}, returning the result via a "future". + * <p/> + * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using + * the executor in the "params", as that may bring the background thread pool to a + * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used + * instead. * <p/> * This method assumes the following: * <ul> @@ -373,31 +353,23 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @return a function that will start the operation and return its result when * complete */ - protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture( - ControlLoopOperationParams params, int attempt) { - - /* - * TODO As doOperation() may perform blocking I/O, this should be launched in its - * own thread to prevent the ForkJoinPool from being tied up. Should probably - * provide a method to make that easy. - */ + protected CompletableFuture<OperationOutcome> startOperationAsync(ControlLoopOperationParams params, int attempt, + OperationOutcome outcome) { - return operation -> CompletableFuture.supplyAsync(() -> doOperation(params, attempt, operation), - params.getExecutor()); + return CompletableFuture.supplyAsync(() -> doOperation(params, attempt, outcome), getBlockingExecutor()); } /** * Low-level method that performs the operation. This can make the same assumptions * that are made by {@link #doOperationAsFuture(ControlLoopOperationParams)}. This - * method throws an {@link UnsupportedOperationException}. + * particular method simply throws an {@link UnsupportedOperationException}. * * @param params operation parameters * @param attempt attempt number, typically starting with 1 * @param operation the operation being performed * @return the outcome of the operation */ - protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt, - ControlLoopOperation operation) { + protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt, OperationOutcome operation) { throw new UnsupportedOperationException("start operation " + getFullName()); } @@ -411,8 +383,7 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param attempt latest attempt number, starting with 1 * @return a function to get the next future to execute */ - private Function<ControlLoopOperation, ControlLoopOperation> setRetryFlag(ControlLoopOperationParams params, - int attempt) { + private Function<OperationOutcome, OperationOutcome> setRetryFlag(ControlLoopOperationParams params, int attempt) { return operation -> { if (operation != null && !isActorFailed(operation)) { @@ -424,22 +395,22 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj } // get a non-null operation - ControlLoopOperation oper2; + OperationOutcome oper2; if (operation != null) { oper2 = operation; } else { oper2 = params.makeOutcome(); - oper2.setOutcome(OUTCOME_FAILURE); + oper2.setResult(PolicyResult.FAILURE); } - if (params.getPolicy().getRetry() != null && params.getPolicy().getRetry() > 0 - && attempt > params.getPolicy().getRetry()) { + Integer retry = params.getRetry(); + if (retry != null && retry > 0 && attempt > retry) { /* * retries were specified and we've already tried them all - change to * FAILURE_RETRIES */ logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId()); - oper2.setOutcome(OUTCOME_RETRIES); + oper2.setResult(PolicyResult.FAILURE_RETRIES); } return oper2; @@ -456,21 +427,24 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param attempt latest attempt number, starting with 1 * @return a function to get the next future to execute */ - private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> retryOnFailure( - ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller, + private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure( + ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller, int attempt) { return operation -> { if (!isActorFailed(operation)) { // wrong type or wrong operation - just leave it as is logger.trace("not retrying operation {} for {}", getFullName(), params.getRequestId()); - return CompletableFuture.completedFuture(operation); + controller.complete(operation); + return new CompletableFuture<>(); } - if (params.getPolicy().getRetry() == null || params.getPolicy().getRetry() <= 0) { + Integer retry = params.getRetry(); + if (retry == null || retry <= 0) { // no retries - already marked as FAILURE, so just return it logger.info("operation {} no retries for {}", getFullName(), params.getRequestId()); - return CompletableFuture.completedFuture(operation); + controller.complete(operation); + return new CompletableFuture<>(); } @@ -484,100 +458,279 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj } /** - * Gets the outcome of an operation for this operation. + * Converts an exception into an operation outcome, returning a copy of the outcome to + * prevent background jobs from changing it. * - * @param operation operation whose outcome is to be extracted - * @return the outcome of the given operation, if it's for this operator, {@code null} - * otherwise + * @param params operation parameters + * @param type type of item throwing the exception + * @return a function that will convert an exception into an operation outcome */ - protected String getActorOutcome(ControlLoopOperation operation) { - if (operation == null) { - return null; - } + private Function<Throwable, OperationOutcome> fromException(ControlLoopOperationParams params, String type) { - if (!getActorName().equals(operation.getActor())) { - return null; - } + return thrown -> { + OperationOutcome outcome = params.makeOutcome(); + + logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(), + params.getRequestId(), thrown); + + return setOutcome(params, outcome, thrown); + }; + } + + /** + * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels + * any outstanding futures when one completes. + * + * @param params operation parameters + * @param futures futures for which to wait + * @return a future to cancel or await an outcome. If this future is canceled, then + * all of the futures will be canceled + */ + protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params, + List<CompletableFuture<OperationOutcome>> futures) { + + // convert list to an array + @SuppressWarnings("rawtypes") + CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]); + + @SuppressWarnings("unchecked") + CompletableFuture<OperationOutcome> result = anyOf(params, arrFutures); + return result; + } + + /** + * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any + * outstanding futures when one completes. + * + * @param params operation parameters + * @param futures futures for which to wait + * @return a future to cancel or await an outcome. If this future is canceled, then + * all of the futures will be canceled + */ + protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params, + @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) { + + final Executor executor = params.getExecutor(); + final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + attachFutures(controller, futures); + + // @formatter:off + CompletableFuture.anyOf(futures) + .thenApply(object -> (OperationOutcome) object) + .whenCompleteAsync(controller.delayedComplete(), executor); + // @formatter:on + + return controller; + } + + /** + * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels + * the futures if returned future is canceled. The future returns the "worst" outcome, + * based on priority (see {@link #detmPriority(OperationOutcome)}). + * + * @param params operation parameters + * @param futures futures for which to wait + * @return a future to cancel or await an outcome. If this future is canceled, then + * all of the futures will be canceled + */ + protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params, + List<CompletableFuture<OperationOutcome>> futures) { - if (!getName().equals(operation.getOperation())) { - return null; + // convert list to an array + @SuppressWarnings("rawtypes") + CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]); + + @SuppressWarnings("unchecked") + CompletableFuture<OperationOutcome> result = allOf(params, arrFutures); + return result; + } + + /** + * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the + * futures if returned future is canceled. The future returns the "worst" outcome, + * based on priority (see {@link #detmPriority(OperationOutcome)}). + * + * @param params operation parameters + * @param futures futures for which to wait + * @return a future to cancel or await an outcome. If this future is canceled, then + * all of the futures will be canceled + */ + protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params, + @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) { + + final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + attachFutures(controller, futures); + + OperationOutcome[] outcomes = new OperationOutcome[futures.length]; + + @SuppressWarnings("rawtypes") + CompletableFuture[] futures2 = new CompletableFuture[futures.length]; + + // record the outcomes of each future when it completes + for (int count = 0; count < futures2.length; ++count) { + final int count2 = count; + futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2); } - return operation.getOutcome(); + CompletableFuture.allOf(futures2).whenComplete(combineOutcomes(params, controller, outcomes)); + + return controller; } /** - * Gets a function that will start the next step, if the current operation was - * successful, or just return the current operation, otherwise. + * Attaches the given futures to the controller. + * + * @param controller master controller for all of the futures + * @param futures futures to be attached to the controller + */ + private void attachFutures(PipelineControllerFuture<OperationOutcome> controller, + @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) { + + // attach each task + for (CompletableFuture<OperationOutcome> future : futures) { + controller.add(future); + } + } + + /** + * Combines the outcomes from a set of tasks. * * @param params operation parameters - * @param nextStep function that will invoke the next step, passing it the operation - * @return a function that will start the next step + * @param future future to be completed with the combined result + * @param outcomes outcomes to be examined */ - protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> onSuccess( - ControlLoopOperationParams params, - Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> nextStep) { + private BiConsumer<Void, Throwable> combineOutcomes(ControlLoopOperationParams params, + CompletableFuture<OperationOutcome> future, OperationOutcome[] outcomes) { - return operation -> { + return (unused, thrown) -> { + if (thrown != null) { + future.completeExceptionally(thrown); + return; + } - if (operation == null) { - logger.trace("{}: null outcome - discarding next task for {}", getFullName(), params.getRequestId()); - ControlLoopOperation outcome = params.makeOutcome(); - outcome.setOutcome(OUTCOME_FAILURE); - return CompletableFuture.completedFuture(outcome); + // identify the outcome with the highest priority + OperationOutcome outcome = outcomes[0]; + int priority = detmPriority(outcome); - } else if (isSuccess(operation)) { - logger.trace("{}: success - starting next task for {}", getFullName(), params.getRequestId()); - return nextStep.apply(operation); + // start with "1", as we've already dealt with "0" + for (int count = 1; count < outcomes.length; ++count) { + OperationOutcome outcome2 = outcomes[count]; + int priority2 = detmPriority(outcome2); - } else { - logger.trace("{}: failure - discarding next task for {}", getFullName(), params.getRequestId()); - return CompletableFuture.completedFuture(operation); + if (priority2 > priority) { + outcome = outcome2; + priority = priority2; + } } + + logger.trace("{}: combined outcome of tasks is {} for {}", getFullName(), + (outcome == null ? null : outcome.getResult()), params.getRequestId()); + + future.complete(outcome); }; } /** - * Converts an exception into an operation outcome, returning a copy of the outcome to - * prevent background jobs from changing it. + * Determines the priority of an outcome based on its result. * - * @param params operation parameters - * @param operation current operation - * @return a function that will convert an exception into an operation outcome + * @param outcome outcome to examine, or {@code null} + * @return the outcome's priority */ - private Function<Throwable, ControlLoopOperation> fromException(ControlLoopOperationParams params, - ControlLoopOperation operation) { + protected int detmPriority(OperationOutcome outcome) { + if (outcome == null) { + return 1; + } - return thrown -> { - logger.warn("exception throw by operation {}.{} for {}", operation.getActor(), operation.getOperation(), - params.getRequestId(), thrown); + switch (outcome.getResult()) { + case SUCCESS: + return 0; + + case FAILURE_GUARD: + return 2; + + case FAILURE_RETRIES: + return 3; + + case FAILURE: + return 4; + + case FAILURE_TIMEOUT: + return 5; + + case FAILURE_EXCEPTION: + default: + return 6; + } + } + + /** + * Performs a task, after verifying that the controller is still running. Also checks + * that the previous outcome was successful, if specified. + * + * @param params operation parameters + * @param controller overall pipeline controller + * @param checkSuccess {@code true} to check the previous outcome, {@code false} + * otherwise + * @param outcome outcome of the previous task + * @param tasks tasks to be performed + * @return a function to perform the task. If everything checks out, then it returns + * the task's future. Otherwise, it returns an incomplete future and completes + * the controller instead. + */ + // @formatter:off + protected CompletableFuture<OperationOutcome> doTask(ControlLoopOperationParams params, + PipelineControllerFuture<OperationOutcome> controller, + boolean checkSuccess, OperationOutcome outcome, + CompletableFuture<OperationOutcome> task) { + // @formatter:on + if (checkSuccess && !isSuccess(outcome)) { /* - * Must make a copy of the operation, as the original could be changed by - * background jobs that might still be running. + * must complete before canceling so that cancel() doesn't cause controller to + * complete */ - return setOutcome(params, new ControlLoopOperation(operation), thrown); - }; + controller.complete(outcome); + task.cancel(false); + return new CompletableFuture<>(); + } + + return controller.wrap(task); } /** - * Gets a function to verify that the operation is still running. If the pipeline is - * not running, then it returns an incomplete future, which will effectively halt - * subsequent operations in the pipeline. This method is intended to be used with one - * of the {@link CompletableFuture}'s <i>thenCompose()</i> methods. + * Performs a task, after verifying that the controller is still running. Also checks + * that the previous outcome was successful, if specified. * - * @param controller pipeline controller * @param params operation parameters - * @return a function to verify that the operation is still running + * @param controller overall pipeline controller + * @param checkSuccess {@code true} to check the previous outcome, {@code false} + * otherwise + * @param tasks tasks to be performed + * @return a function to perform the task. If everything checks out, then it returns + * the task's future. Otherwise, it returns an incomplete future and completes + * the controller instead. */ - protected <T> Function<T, CompletableFuture<T>> verifyRunning( - PipelineControllerFuture<ControlLoopOperation> controller, ControlLoopOperationParams params) { + // @formatter:off + protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask(ControlLoopOperationParams params, + PipelineControllerFuture<OperationOutcome> controller, + boolean checkSuccess, + Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) { + // @formatter:on + + return outcome -> { + + if (!controller.isRunning()) { + return new CompletableFuture<>(); + } - return value -> { - boolean running = controller.isRunning(); - logger.trace("{}: verify running {} for {}", getFullName(), running, params.getRequestId()); + if (checkSuccess && !isSuccess(outcome)) { + controller.complete(outcome); + return new CompletableFuture<>(); + } - return (running ? CompletableFuture.completedFuture(value) : new CompletableFuture<>()); + return controller.wrap(task.apply(outcome)); }; } @@ -591,10 +744,10 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param callbacks used to determine if the start callback can be invoked * @return a function that sets the start time and invokes the callback */ - private Function<ControlLoopOperation, ControlLoopOperation> callbackStarted(ControlLoopOperationParams params, + private BiConsumer<OperationOutcome, Throwable> callbackStarted(ControlLoopOperationParams params, CallbackManager callbacks) { - return outcome -> { + return (outcome, thrown) -> { if (callbacks.canStart()) { // haven't invoked "start" callback yet @@ -602,8 +755,6 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj outcome.setEnd(null); params.callbackStarted(outcome); } - - return outcome; }; } @@ -621,18 +772,16 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param callbacks used to determine if the end callback can be invoked * @return a function that sets the end time and invokes the callback */ - private Function<ControlLoopOperation, ControlLoopOperation> callbackCompleted(ControlLoopOperationParams params, + private BiConsumer<OperationOutcome, Throwable> callbackCompleted(ControlLoopOperationParams params, CallbackManager callbacks) { - return operation -> { + return (outcome, thrown) -> { if (callbacks.canEnd()) { - operation.setStart(callbacks.getStartTime()); - operation.setEnd(callbacks.getEndTime()); - params.callbackCompleted(operation); + outcome.setStart(callbacks.getStartTime()); + outcome.setEnd(callbacks.getEndTime()); + params.callbackCompleted(outcome); } - - return operation; }; } @@ -643,7 +792,7 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param operation operation to be updated * @return the updated operation */ - protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation, + protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation, Throwable thrown) { PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION); return setOutcome(params, operation, result); @@ -657,10 +806,10 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * @param result result of the operation * @return the updated operation */ - protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation, + protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation, PolicyResult result) { logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId()); - operation.setOutcome(result.toString()); + operation.setResult(result); operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG : ControlLoopOperation.FAILED_MSG); @@ -687,71 +836,10 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj * Gets the operation timeout. Subclasses may override this method to obtain the * timeout in some other way (e.g., through configuration properties). * - * @param policy policy from which to extract the timeout + * @param timeoutSec timeout, in seconds, or {@code null} * @return the operation timeout, in milliseconds */ - protected long getTimeOutMillis(Policy policy) { - Integer timeoutSec = policy.getTimeout(); + protected long getTimeOutMillis(Integer timeoutSec) { return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS)); } - - /** - * Manager for "start" and "end" callbacks. - */ - private static class CallbackManager implements Runnable { - private final AtomicReference<Instant> startTime = new AtomicReference<>(); - private final AtomicReference<Instant> endTime = new AtomicReference<>(); - - /** - * Determines if the "start" callback can be invoked. If so, it sets the - * {@link #startTime} to the current time. - * - * @return {@code true} if the "start" callback can be invoked, {@code false} - * otherwise - */ - public boolean canStart() { - return startTime.compareAndSet(null, Instant.now()); - } - - /** - * Determines if the "end" callback can be invoked. If so, it sets the - * {@link #endTime} to the current time. - * - * @return {@code true} if the "end" callback can be invoked, {@code false} - * otherwise - */ - public boolean canEnd() { - return endTime.compareAndSet(null, Instant.now()); - } - - /** - * Gets the start time. - * - * @return the start time, or {@code null} if {@link #canStart()} has not been - * invoked yet. - */ - public Instant getStartTime() { - return startTime.get(); - } - - /** - * Gets the end time. - * - * @return the end time, or {@code null} if {@link #canEnd()} has not been invoked - * yet. - */ - public Instant getEndTime() { - return endTime.get(); - } - - /** - * Prevents further callbacks from being executed by setting {@link #startTime} - * and {@link #endTime}. - */ - @Override - public void run() { - canStart(); - canEnd(); - } - } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java index 08aba81f2..57fce40d7 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java @@ -20,6 +20,7 @@ package org.onap.policy.controlloop.actorserviceprovider.parameters; +import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -32,11 +33,11 @@ import lombok.Getter; import org.onap.policy.common.parameters.BeanValidationResult; import org.onap.policy.common.parameters.BeanValidator; import org.onap.policy.common.parameters.annotations.NotNull; -import org.onap.policy.controlloop.ControlLoopOperation; import org.onap.policy.controlloop.actorserviceprovider.ActorService; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; import org.onap.policy.controlloop.actorserviceprovider.Util; import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext; -import org.onap.policy.controlloop.policy.Policy; +import org.onap.policy.controlloop.policy.Target; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,36 +50,67 @@ import org.slf4j.LoggerFactory; @AllArgsConstructor @EqualsAndHashCode public class ControlLoopOperationParams { - private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationParams.class); - public static final String UNKNOWN = "-unknown-"; - + /** + * Actor name. + */ + @NotNull + private String actor; /** - * The actor service in which to find the actor/operation. + * Actor service in which to find the actor/operation. */ @NotNull private ActorService actorService; /** - * The event for which the operation applies. + * Event for which the operation applies. */ @NotNull private ControlLoopEventContext context; /** - * The executor to use to run the operation. + * Executor to use to run the operation. */ @NotNull @Builder.Default private Executor executor = ForkJoinPool.commonPool(); /** - * The policy associated with the operation. + * Operation name. + */ + @NotNull + private String operation; + + /** + * Payload data for the request. + */ + private Map<String, String> payload; + + /** + * Number of retries allowed, or {@code null} if no retries. + */ + private Integer retry; + + /** + * The entity's target information. May be {@code null}, depending on the requirement + * of the operation to be invoked. + */ + private Target target; + + /** + * Target entity. */ @NotNull - private Policy policy; + private String targetEntity; + + /** + * Timeout, in seconds, or {@code null} if no timeout. Zero and negative values also + * imply no timeout. + */ + @Builder.Default + private Integer timeoutSec = 300; /** * The function to invoke when the operation starts. This is optional. @@ -87,7 +119,7 @@ public class ControlLoopOperationParams { * may happen if the current operation requires other operations to be performed first * (e.g., A&AI queries, guard checks). */ - private Consumer<ControlLoopOperation> startCallback; + private Consumer<OperationOutcome> startCallback; /** * The function to invoke when the operation completes. This is optional. @@ -96,13 +128,7 @@ public class ControlLoopOperationParams { * may happen if the current operation requires other operations to be performed first * (e.g., A&AI queries, guard checks). */ - private Consumer<ControlLoopOperation> completeCallback; - - /** - * Target entity. - */ - @NotNull - private String target; + private Consumer<OperationOutcome> completeCallback; /** * Starts the specified operation. @@ -110,7 +136,7 @@ public class ControlLoopOperationParams { * @return a future that will return the result of the operation * @throws IllegalArgumentException if the parameters are invalid */ - public CompletableFuture<ControlLoopOperation> start() { + public CompletableFuture<OperationOutcome> start() { BeanValidationResult result = validate(); if (!result.isValid()) { logger.warn("parameter error in operation {}.{} for {}:\n{}", getActor(), getOperation(), getRequestId(), @@ -120,31 +146,13 @@ public class ControlLoopOperationParams { // @formatter:off return actorService - .getActor(policy.getActor()) - .getOperator(policy.getRecipe()) + .getActor(getActor()) + .getOperator(getOperation()) .startOperation(this); // @formatter:on } /** - * Gets the name of the actor from the policy. - * - * @return the actor name, or {@link #UNKNOWN} if no name is available - */ - public String getActor() { - return (policy == null || policy.getActor() == null ? UNKNOWN : policy.getActor()); - } - - /** - * Gets the name of the operation from the policy. - * - * @return the operation name, or {@link #UNKNOWN} if no name is available - */ - public String getOperation() { - return (policy == null || policy.getRecipe() == null ? UNKNOWN : policy.getRecipe()); - } - - /** * Gets the requested ID of the associated event. * * @return the event's request ID, or {@code null} if no request ID is available @@ -158,13 +166,13 @@ public class ControlLoopOperationParams { * * @return a new operation outcome */ - public ControlLoopOperation makeOutcome() { - ControlLoopOperation operation = new ControlLoopOperation(); - operation.setActor(getActor()); - operation.setOperation(getOperation()); - operation.setTarget(target); + public OperationOutcome makeOutcome() { + OperationOutcome outcome = new OperationOutcome(); + outcome.setActor(getActor()); + outcome.setOperation(getOperation()); + outcome.setTarget(targetEntity); - return operation; + return outcome; } /** @@ -173,11 +181,11 @@ public class ControlLoopOperationParams { * * @param operation the operation that is being started */ - public void callbackStarted(ControlLoopOperation operation) { + public void callbackStarted(OperationOutcome operation) { logger.info("started operation {}.{} for {}", operation.getActor(), operation.getOperation(), getRequestId()); if (startCallback != null) { - Util.logException(() -> startCallback.accept(operation), "{}.{}: start-callback threw an exception for {}", + Util.runFunction(() -> startCallback.accept(operation), "{}.{}: start-callback threw an exception for {}", operation.getActor(), operation.getOperation(), getRequestId()); } } @@ -188,12 +196,12 @@ public class ControlLoopOperationParams { * * @param operation the operation that is being started */ - public void callbackCompleted(ControlLoopOperation operation) { + public void callbackCompleted(OperationOutcome operation) { logger.info("completed operation {}.{} outcome={} for {}", operation.getActor(), operation.getOperation(), - operation.getOutcome(), getRequestId()); + operation.getResult(), getRequestId()); if (completeCallback != null) { - Util.logException(() -> completeCallback.accept(operation), + Util.runFunction(() -> completeCallback.accept(operation), "{}.{}: complete-callback threw an exception for {}", operation.getActor(), operation.getOperation(), getRequestId()); } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java index d34a3fb5b..1d64a8710 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java @@ -101,7 +101,7 @@ public class ListenerManager { */ protected void runListener(Runnable listener) { // TODO do this asynchronously? - Util.logException(listener, "pipeline listener {} threw an exception", listener); + Util.runFunction(listener, "pipeline listener {} threw an exception", listener); } /** diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java index 96c8f9e05..92843e28a 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java @@ -23,41 +23,70 @@ package org.onap.policy.controlloop.actorserviceprovider.pipeline; import static org.onap.policy.controlloop.actorserviceprovider.Util.ident; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; import lombok.NoArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Pipeline controller, used by operations within the pipeline to determine if they should - * continue to run. If {@link #cancel(boolean)} is invoked, it automatically stops the - * pipeline. + * continue to run. Whenever this is canceled or completed, it automatically cancels all + * futures and runs all listeners that have been added. */ @NoArgsConstructor public class PipelineControllerFuture<T> extends CompletableFuture<T> { private static final Logger logger = LoggerFactory.getLogger(PipelineControllerFuture.class); + private static final String COMPLETE_EXCEPT_MSG = "{}: complete future with exception"; + private static final String CANCEL_MSG = "{}: cancel future"; + private static final String COMPLETE_MSG = "{}: complete future"; + /** * Tracks items added to this controller via one of the <i>add</i> methods. */ private final FutureManager futures = new FutureManager(); - /** - * Cancels and stops the pipeline, in that order. - */ @Override public boolean cancel(boolean mayInterruptIfRunning) { - try { - logger.trace("{}: cancel future", ident(this)); - return super.cancel(mayInterruptIfRunning); + return doAndStop(() -> super.cancel(mayInterruptIfRunning), CANCEL_MSG, ident(this)); + } - } finally { - futures.stop(); - } + @Override + public boolean complete(T value) { + return doAndStop(() -> super.complete(value), COMPLETE_MSG, ident(this)); + } + + @Override + public boolean completeExceptionally(Throwable ex) { + return doAndStop(() -> super.completeExceptionally(ex), COMPLETE_EXCEPT_MSG, ident(this)); + } + + @Override + public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor) { + return super.completeAsync(() -> doAndStop(supplier, COMPLETE_MSG, ident(this)), executor); + } + + @Override + public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) { + return super.completeAsync(() -> doAndStop(supplier, COMPLETE_MSG, ident(this))); + } + + @Override + public CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit) { + logger.info("{}: set future timeout to {} {}", ident(this), timeout, unit); + return super.completeOnTimeout(value, timeout, unit); + } + + @Override + public <U> PipelineControllerFuture<U> newIncompleteFuture() { + return new PipelineControllerFuture<>(); } /** @@ -67,11 +96,8 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> { * * @return a function that removes the given future */ - public <F> BiConsumer<T, Throwable> delayedRemove(Future<F> future) { - return (value, thrown) -> { - logger.trace("{}: remove future {}", ident(this), ident(future)); - remove(future); - }; + public <F> BiConsumer<F, Throwable> delayedRemove(Future<F> future) { + return (value, thrown) -> remove(future); } /** @@ -81,11 +107,8 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> { * * @return a function that removes the given listener */ - public BiConsumer<T, Throwable> delayedRemove(Runnable listener) { - return (value, thrown) -> { - logger.trace("{}: remove listener {}", ident(this), ident(listener)); - remove(listener); - }; + public <F> BiConsumer<F, Throwable> delayedRemove(Runnable listener) { + return (value, thrown) -> remove(listener); } /** @@ -98,25 +121,43 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> { public BiConsumer<T, Throwable> delayedComplete() { return (value, thrown) -> { if (thrown == null) { - logger.trace("{}: complete and stop future", ident(this)); complete(value); } else { - logger.trace("{}: complete exceptionally and stop future", ident(this)); completeExceptionally(thrown); } - - futures.stop(); }; } /** + * Adds a future to the controller and arranges for it to be removed from the + * controller when it completes, whether or not it throws an exception. If the + * controller has already been stopped, then the future is canceled and a new, + * incomplete future is returned. + * + * @param future future to be wrapped + * @return a new future + */ + public CompletableFuture<T> wrap(CompletableFuture<T> future) { + if (!isRunning()) { + logger.trace("{}: not running, skipping next task {}", ident(this), ident(future)); + future.cancel(false); + return new CompletableFuture<>(); + } + + add(future); + return future.whenComplete(this.delayedRemove(future)); + } + + /** * Adds a function whose return value is to be canceled when this controller is * stopped. Note: if the controller is already stopped, then the function will * <i>not</i> be executed. * - * @param futureMaker function to be invoked in the future + * @param futureMaker function to be invoked to create the future + * @return a function to create the future and arrange for it to be managed by this + * controller */ - public <F> Function<F, CompletableFuture<F>> add(Function<F, CompletableFuture<F>> futureMaker) { + public <F> Function<F, CompletableFuture<F>> wrap(Function<F, CompletableFuture<F>> futureMaker) { return input -> { if (!isRunning()) { @@ -127,7 +168,7 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> { CompletableFuture<F> future = futureMaker.apply(input); add(future); - return future; + return future.whenComplete(delayedRemove(future)); }; } @@ -154,4 +195,26 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> { logger.trace("{}: remove listener {}", ident(this), ident(listener)); futures.remove(listener); } + + /** + * Performs an operation, stops the futures, and returns the value from the operation. + * Logs a message using the given arguments. + * + * + * @param <R> type of value to be returned + * @param supplier operation to perform + * @param message message to be logged + * @param args message arguments to fill "{}" place-holders + * @return the operation's result + */ + private <R> R doAndStop(Supplier<R> supplier, String message, Object... args) { + try { + logger.trace(message, args); + return supplier.get(); + + } finally { + logger.trace("{}: stopping this future", ident(this)); + futures.stop(); + } + } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandlerTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandlerTest.java new file mode 100644 index 000000000..31c6d2077 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandlerTest.java @@ -0,0 +1,172 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.controlloop.VirtualControlLoopEvent; +import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.policy.PolicyResult; + +public class AsyncResponseHandlerTest { + + private static final String ACTOR = "my-actor"; + private static final String OPERATION = "my-operation"; + private static final UUID REQ_ID = UUID.randomUUID(); + private static final String TEXT = "some text"; + + private VirtualControlLoopEvent event; + private ControlLoopEventContext context; + private ControlLoopOperationParams params; + private OperationOutcome outcome; + private MyHandler handler; + + /** + * Initializes all fields, including {@link #handler}. + */ + @Before + public void setUp() { + event = new VirtualControlLoopEvent(); + event.setRequestId(REQ_ID); + + context = new ControlLoopEventContext(event); + params = ControlLoopOperationParams.builder().actor(ACTOR).operation(OPERATION).context(context).build(); + outcome = params.makeOutcome(); + + handler = new MyHandler(params, outcome); + } + + @Test + public void testAsyncResponseHandler_testGetParams_testGetOutcome() { + assertSame(params, handler.getParams()); + assertSame(outcome, handler.getOutcome()); + } + + @Test + public void testHandle() { + CompletableFuture<String> future = new CompletableFuture<>(); + handler.handle(future).complete(outcome); + + assertTrue(future.isCancelled()); + } + + @Test + public void testCompleted() throws Exception { + CompletableFuture<OperationOutcome> result = handler.handle(new CompletableFuture<>()); + handler.completed(TEXT); + assertTrue(result.isDone()); + assertSame(outcome, result.get()); + assertEquals(PolicyResult.FAILURE_RETRIES, outcome.getResult()); + assertEquals(TEXT, outcome.getMessage()); + } + + /** + * Tests completed() when doCompleted() throws an exception. + */ + @Test + public void testCompletedException() throws Exception { + IllegalStateException except = new IllegalStateException(); + + outcome = params.makeOutcome(); + handler = new MyHandler(params, outcome) { + @Override + protected OperationOutcome doComplete(String rawResponse) { + throw except; + } + }; + + CompletableFuture<OperationOutcome> result = handler.handle(new CompletableFuture<>()); + handler.completed(TEXT); + assertTrue(result.isCompletedExceptionally()); + + AtomicReference<Throwable> thrown = new AtomicReference<>(); + result.whenComplete((unused, thrown2) -> thrown.set(thrown2)); + + assertSame(except, thrown.get()); + } + + @Test + public void testFailed() throws Exception { + IllegalStateException except = new IllegalStateException(); + + CompletableFuture<OperationOutcome> result = handler.handle(new CompletableFuture<>()); + handler.failed(except); + + assertTrue(result.isDone()); + assertSame(outcome, result.get()); + assertEquals(PolicyResult.FAILURE_GUARD, outcome.getResult()); + } + + /** + * Tests failed() when doFailed() throws an exception. + */ + @Test + public void testFailedException() throws Exception { + IllegalStateException except = new IllegalStateException(); + + outcome = params.makeOutcome(); + handler = new MyHandler(params, outcome) { + @Override + protected OperationOutcome doFailed(Throwable thrown) { + throw except; + } + }; + + CompletableFuture<OperationOutcome> result = handler.handle(new CompletableFuture<>()); + handler.failed(except); + assertTrue(result.isCompletedExceptionally()); + + AtomicReference<Throwable> thrown = new AtomicReference<>(); + result.whenComplete((unused, thrown2) -> thrown.set(thrown2)); + + assertSame(except, thrown.get()); + } + + private class MyHandler extends AsyncResponseHandler<String> { + + public MyHandler(ControlLoopOperationParams params, OperationOutcome outcome) { + super(params, outcome); + } + + @Override + protected OperationOutcome doComplete(String rawResponse) { + OperationOutcome outcome = getOutcome(); + outcome.setResult(PolicyResult.FAILURE_RETRIES); + outcome.setMessage(rawResponse); + return outcome; + } + + @Override + protected OperationOutcome doFailed(Throwable thrown) { + OperationOutcome outcome = getOutcome(); + outcome.setResult(PolicyResult.FAILURE_GUARD); + return outcome; + } + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManagerTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManagerTest.java new file mode 100644 index 000000000..44606cb14 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManagerTest.java @@ -0,0 +1,89 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.time.Instant; +import org.junit.Before; +import org.junit.Test; + +public class CallbackManagerTest { + + private CallbackManager mgr; + + @Before + public void setUp() { + mgr = new CallbackManager(); + } + + @Test + public void testCanStart_testGetStartTime() { + // null until canXxx() is called + assertNull(mgr.getStartTime()); + + assertTrue(mgr.canStart()); + + Instant time = mgr.getStartTime(); + assertNotNull(time); + assertNull(mgr.getEndTime()); + + // false for now on + assertFalse(mgr.canStart()); + assertFalse(mgr.canStart()); + + assertEquals(time, mgr.getStartTime()); + } + + @Test + public void testCanEnd_testGetEndTime() { + // null until canXxx() is called + assertNull(mgr.getEndTime()); + assertNull(mgr.getEndTime()); + + assertTrue(mgr.canEnd()); + + Instant time = mgr.getEndTime(); + assertNotNull(time); + assertNull(mgr.getStartTime()); + + // false for now on + assertFalse(mgr.canEnd()); + assertFalse(mgr.canEnd()); + + assertEquals(time, mgr.getEndTime()); + } + + @Test + public void testRun() { + mgr.run(); + + assertNotNull(mgr.getStartTime()); + assertNotNull(mgr.getEndTime()); + + assertFalse(mgr.canStart()); + assertFalse(mgr.canEnd()); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcomeTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcomeTest.java new file mode 100644 index 000000000..4e9728336 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcomeTest.java @@ -0,0 +1,137 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.policy.PolicyResult; + +public class OperationOutcomeTest { + private static final String ACTOR = "my-actor"; + private static final String OPERATION = "my-operation"; + private static final String TARGET = "my-target"; + private static final Instant START = Instant.ofEpochMilli(10); + private static final Instant END = Instant.ofEpochMilli(20); + private static final String SUB_REQ_ID = "my-sub-request-id"; + private static final PolicyResult RESULT = PolicyResult.FAILURE_GUARD; + private static final String MESSAGE = "my-message"; + + private OperationOutcome outcome; + + @Before + public void setUp() { + outcome = new OperationOutcome(); + } + + @Test + public void testOperationOutcomeOperationOutcome() { + setAll(); + + OperationOutcome outcome2 = new OperationOutcome(outcome); + + assertEquals(ACTOR, outcome2.getActor()); + assertEquals(OPERATION, outcome2.getOperation()); + assertEquals(TARGET, outcome2.getTarget()); + assertEquals(START, outcome2.getStart()); + assertEquals(END, outcome2.getEnd()); + assertEquals(SUB_REQ_ID, outcome2.getSubRequestId()); + assertEquals(RESULT, outcome2.getResult()); + assertEquals(MESSAGE, outcome2.getMessage()); + } + + @Test + public void testToControlLoopOperation() { + setAll(); + + ControlLoopOperation outcome2 = outcome.toControlLoopOperation(); + + assertEquals(ACTOR, outcome2.getActor()); + assertEquals(OPERATION, outcome2.getOperation()); + assertEquals(TARGET, outcome2.getTarget()); + assertEquals(START, outcome2.getStart()); + assertEquals(END, outcome2.getEnd()); + assertEquals(SUB_REQ_ID, outcome2.getSubRequestId()); + assertEquals(RESULT.toString(), outcome2.getOutcome()); + assertEquals(MESSAGE, outcome2.getMessage()); + } + + /** + * Tests both isFor() methods, as one invokes the other. + */ + @Test + public void testIsFor() { + setAll(); + + // null case + assertFalse(OperationOutcome.isFor(null, ACTOR, OPERATION)); + + // actor mismatch + assertFalse(OperationOutcome.isFor(outcome, TARGET, OPERATION)); + + // operation mismatch + assertFalse(OperationOutcome.isFor(outcome, ACTOR, TARGET)); + + // null actor in outcome + outcome.setActor(null); + assertFalse(OperationOutcome.isFor(outcome, ACTOR, OPERATION)); + outcome.setActor(ACTOR); + + // null operation in outcome + outcome.setOperation(null); + assertFalse(OperationOutcome.isFor(outcome, ACTOR, OPERATION)); + outcome.setOperation(OPERATION); + + // null actor argument + assertThatThrownBy(() -> outcome.isFor(null, OPERATION)); + + // null operation argument + assertThatThrownBy(() -> outcome.isFor(ACTOR, null)); + + // true case + assertTrue(OperationOutcome.isFor(outcome, ACTOR, OPERATION)); + } + + @Test + public void testSetResult() { + outcome.setResult(PolicyResult.FAILURE_EXCEPTION); + assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult()); + + assertThatThrownBy(() -> outcome.setResult(null)); + } + + private void setAll() { + outcome.setActor(ACTOR); + outcome.setEnd(END); + outcome.setMessage(MESSAGE); + outcome.setOperation(OPERATION); + outcome.setResult(RESULT); + outcome.setStart(START); + outcome.setSubRequestId(SUB_REQ_ID); + outcome.setTarget(TARGET); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/UtilTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/UtilTest.java index c652e8374..4a3f321cf 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/UtilTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/UtilTest.java @@ -27,15 +27,56 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import ch.qos.logback.classic.Logger; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; import lombok.Builder; import lombok.Data; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.common.utils.test.log.logback.ExtractAppender; +import org.slf4j.LoggerFactory; public class UtilTest { + private static final String MY_REQUEST = "my-request"; + private static final String URL = "my-url"; + private static final String OUT_URL = "OUT|REST|my-url"; + private static final String IN_URL = "IN|REST|my-url"; + protected static final String EXPECTED_EXCEPTION = "expected exception"; + + /** + * Used to attach an appender to the class' logger. + */ + private static final Logger logger = (Logger) LoggerFactory.getLogger(Util.class); + private static final ExtractAppender appender = new ExtractAppender(); + + /** + * Initializes statics. + */ + @BeforeClass + public static void setUpBeforeClass() { + appender.setContext(logger.getLoggerContext()); + appender.start(); + + logger.addAppender(appender); + } + + @AfterClass + public static void tearDownAfterClass() { + appender.stop(); + } + + @Before + public void setUp() { + appender.clearExtractions(); + } @Test public void testIdent() { @@ -48,11 +89,88 @@ public class UtilTest { } @Test - public void testLogException() { + public void testLogRestRequest() throws CoderException { + // log structured data + appender.clearExtractions(); + Util.logRestRequest(URL, new Abc(10, null, null)); + List<String> output = appender.getExtracted(); + assertEquals(1, output.size()); + + assertThat(output.get(0)).contains(OUT_URL).contains("{\n \"intValue\": 10\n}"); + + // log a plain string + appender.clearExtractions(); + Util.logRestRequest(URL, MY_REQUEST); + output = appender.getExtracted(); + assertEquals(1, output.size()); + + assertThat(output.get(0)).contains(OUT_URL).contains(MY_REQUEST); + + // exception from coder + StandardCoder coder = new StandardCoder() { + @Override + public String encode(Object object, boolean pretty) throws CoderException { + throw new CoderException(EXPECTED_EXCEPTION); + } + }; + + appender.clearExtractions(); + Util.logRestRequest(coder, URL, new Abc(11, null, null)); + output = appender.getExtracted(); + assertEquals(2, output.size()); + assertThat(output.get(0)).contains("cannot pretty-print request"); + assertThat(output.get(1)).contains(OUT_URL); + } + + @Test + public void testLogRestResponse() throws CoderException { + // log structured data + appender.clearExtractions(); + Util.logRestResponse(URL, new Abc(10, null, null)); + List<String> output = appender.getExtracted(); + assertEquals(1, output.size()); + + assertThat(output.get(0)).contains(IN_URL).contains("{\n \"intValue\": 10\n}"); + + // log null response + appender.clearExtractions(); + Util.logRestResponse(URL, null); + output = appender.getExtracted(); + assertEquals(1, output.size()); + + assertThat(output.get(0)).contains(IN_URL).contains("null"); + + // log a plain string + appender.clearExtractions(); + Util.logRestResponse(URL, MY_REQUEST); + output = appender.getExtracted(); + assertEquals(1, output.size()); + + assertThat(output.get(0)).contains(IN_URL).contains(MY_REQUEST); + + // exception from coder + StandardCoder coder = new StandardCoder() { + @Override + public String encode(Object object, boolean pretty) throws CoderException { + throw new CoderException(EXPECTED_EXCEPTION); + } + }; + + appender.clearExtractions(); + Util.logRestResponse(coder, URL, new Abc(11, null, null)); + output = appender.getExtracted(); + assertEquals(2, output.size()); + assertThat(output.get(0)).contains("cannot pretty-print response"); + assertThat(output.get(1)).contains(IN_URL); + } + + @Test + public void testRunFunction() { // no exception, no log AtomicInteger count = new AtomicInteger(); - Util.logException(() -> count.incrementAndGet(), "no error"); + Util.runFunction(() -> count.incrementAndGet(), "no error"); assertEquals(1, count.get()); + assertEquals(0, appender.getExtracted().size()); // with an exception Runnable runnable = () -> { @@ -60,8 +178,17 @@ public class UtilTest { throw new IllegalStateException("expected exception"); }; - Util.logException(runnable, "error with no args"); - Util.logException(runnable, "error {} {} arg(s)", "with", 1); + appender.clearExtractions(); + Util.runFunction(runnable, "error with no args"); + List<String> output = appender.getExtracted(); + assertEquals(1, output.size()); + assertThat(output.get(0)).contains("error with no args"); + + appender.clearExtractions(); + Util.runFunction(runnable, "error {} {} arg(s)", "with", 2); + output = appender.getExtracted(); + assertEquals(1, output.size()); + assertThat(output.get(0)).contains("error with 2 arg(s)"); } @Test @@ -123,4 +250,14 @@ public class UtilTest { private int intValue; private String strValue; } + + // throws an exception when getXxx() is used + public static class DataWithException { + @SuppressWarnings("unused") + private int intValue; + + public int getIntValue() { + throw new IllegalStateException(); + } + } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java index fcc3fb12e..0d917ad3e 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java @@ -20,28 +20,55 @@ package org.onap.policy.controlloop.actorserviceprovider.controlloop; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; +import java.util.Map; +import java.util.UUID; import org.junit.Before; import org.junit.Test; import org.onap.policy.controlloop.VirtualControlLoopEvent; public class ControlLoopEventContextTest { + private static final UUID REQ_ID = UUID.randomUUID(); + private Map<String, String> enrichment; private VirtualControlLoopEvent event; private ControlLoopEventContext context; + /** + * Initializes data, including {@link #context}. + */ @Before public void setUp() { + enrichment = Map.of("abc", "one", "def", "two"); + event = new VirtualControlLoopEvent(); + event.setRequestId(REQ_ID); + event.setAai(enrichment); + context = new ControlLoopEventContext(event); } @Test public void testControlLoopEventContext() { assertSame(event, context.getEvent()); + assertSame(REQ_ID, context.getRequestId()); + assertEquals(enrichment, context.getEnrichment()); + + // null event + assertThatThrownBy(() -> new ControlLoopEventContext(null)); + + // no request id, no enrichment data + event.setRequestId(null); + event.setAai(null); + context = new ControlLoopEventContext(event); + assertSame(event, context.getEvent()); + assertNotNull(context.getRequestId()); + assertEquals(Map.of(), context.getEnrichment()); } @Test diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImplTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImplTest.java index 7e0c35a3f..a209fb0d8 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImplTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImplTest.java @@ -34,7 +34,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.util.Collections; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; @@ -192,10 +191,10 @@ public class ActorImplTest { } @Test - public void testSetOperators() { - // cannot set operators if already configured + public void testAddOperator() { + // cannot add operators if already configured actor.configure(params); - assertThatIllegalStateException().isThrownBy(() -> actor.setOperators(Collections.emptyList())); + assertThatIllegalStateException().isThrownBy(() -> actor.addOperator(oper1)); /* * make an actor where operators two and four have names that are duplicates of @@ -367,7 +366,13 @@ public class ActorImplTest { * @return a new actor */ private ActorImpl makeActor(Operator... operators) { - return new ActorImpl(ACTOR_NAME, operators); + ActorImpl actor = new ActorImpl(ACTOR_NAME); + + for (Operator oper : operators) { + actor.addOperator(oper); + } + + return actor; } private static class MyOper extends OperatorPartial implements Operator { diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActorTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActorTest.java new file mode 100644 index 000000000..2da789989 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActorTest.java @@ -0,0 +1,81 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.Map; +import java.util.TreeMap; +import java.util.function.Function; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpActorParams; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; + +public class HttpActorTest { + + private static final String ACTOR = "my-actor"; + private static final String UNKNOWN = "unknown"; + private static final String CLIENT = "my-client"; + private static final long TIMEOUT = 10L; + + private HttpActor actor; + + @Before + public void setUp() { + actor = new HttpActor(ACTOR); + } + + @Test + public void testMakeOperatorParameters() { + HttpActorParams params = new HttpActorParams(); + params.setClientName(CLIENT); + params.setTimeoutSec(TIMEOUT); + params.setPath(Map.of("operA", "urlA", "operB", "urlB")); + + final HttpActor prov = new HttpActor(ACTOR); + Function<String, Map<String, Object>> maker = + prov.makeOperatorParameters(Util.translateToMap(prov.getName(), params)); + + assertNull(maker.apply(UNKNOWN)); + + // use a TreeMap to ensure the properties are sorted + assertEquals("{clientName=my-client, path=urlA, timeoutSec=10}", + new TreeMap<>(maker.apply("operA")).toString()); + + assertEquals("{clientName=my-client, path=urlB, timeoutSec=10}", + new TreeMap<>(maker.apply("operB")).toString()); + + // with invalid actor parameters + params.setClientName(null); + assertThatThrownBy(() -> prov.makeOperatorParameters(Util.translateToMap(prov.getName(), params))) + .isInstanceOf(ParameterValidationRuntimeException.class); + } + + @Test + public void testHttpActor() { + assertEquals(ACTOR, actor.getName()); + assertEquals(ACTOR, actor.getFullName()); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperatorTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperatorTest.java new file mode 100644 index 000000000..c006cf333 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperatorTest.java @@ -0,0 +1,104 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.policy.common.endpoints.http.client.HttpClient; +import org.onap.policy.common.endpoints.http.client.HttpClientFactory; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; + +public class HttpOperatorTest { + + private static final String ACTOR = "my-actor"; + private static final String OPERATION = "my-name"; + private static final String CLIENT = "my-client"; + private static final String PATH = "my-path"; + private static final long TIMEOUT = 100; + + @Mock + private HttpClient client; + + private HttpOperator oper; + + /** + * Initializes fields, including {@link #oper}. + */ + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + oper = new HttpOperator(ACTOR, OPERATION); + } + + @Test + public void testDoConfigureMapOfStringObject_testGetClient_testGetPath_testGetTimeoutSec() { + assertNull(oper.getClient()); + assertNull(oper.getPath()); + assertEquals(0L, oper.getTimeoutSec()); + + oper = new HttpOperator(ACTOR, OPERATION) { + @Override + protected HttpClientFactory getClientFactory() { + HttpClientFactory factory = mock(HttpClientFactory.class); + when(factory.get(CLIENT)).thenReturn(client); + return factory; + } + }; + + HttpParams params = HttpParams.builder().clientName(CLIENT).path(PATH).timeoutSec(TIMEOUT).build(); + Map<String, Object> paramMap = Util.translateToMap(OPERATION, params); + oper.configure(paramMap); + + assertSame(client, oper.getClient()); + assertEquals(PATH, oper.getPath()); + assertEquals(TIMEOUT, oper.getTimeoutSec()); + + // test invalid parameters + paramMap.remove("path"); + assertThatThrownBy(() -> oper.configure(paramMap)).isInstanceOf(ParameterValidationRuntimeException.class); + } + + @Test + public void testHttpOperator() { + assertEquals(ACTOR, oper.getActorName()); + assertEquals(OPERATION, oper.getName()); + assertEquals(ACTOR + "." + OPERATION, oper.getFullName()); + } + + @Test + public void testGetClient() { + assertNotNull(oper.getClientFactory()); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartialTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartialTest.java index 864ac829a..21bc656f2 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartialTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartialTest.java @@ -29,6 +29,7 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.time.Instant; @@ -36,17 +37,20 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Queue; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -58,9 +62,10 @@ import org.junit.Before; import org.junit.Test; import org.onap.policy.controlloop.ControlLoopOperation; import org.onap.policy.controlloop.VirtualControlLoopEvent; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; -import org.onap.policy.controlloop.policy.Policy; +import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; import org.onap.policy.controlloop.policy.PolicyResult; public class OperatorPartialTest { @@ -75,14 +80,10 @@ public class OperatorPartialTest { private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream() .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList()); - private static final List<String> FAILURE_STRINGS = - FAILURE_RESULTS.stream().map(Object::toString).collect(Collectors.toList()); - private VirtualControlLoopEvent event; private Map<String, Object> config; private ControlLoopEventContext context; private MyExec executor; - private Policy policy; private ControlLoopOperationParams params; private MyOper oper; @@ -92,8 +93,8 @@ public class OperatorPartialTest { private Instant tstart; - private ControlLoopOperation opstart; - private ControlLoopOperation opend; + private OperationOutcome opstart; + private OperationOutcome opend; /** * Initializes the fields, including {@link #oper}. @@ -107,13 +108,9 @@ public class OperatorPartialTest { context = new ControlLoopEventContext(event); executor = new MyExec(); - policy = new Policy(); - policy.setActor(ACTOR); - policy.setRecipe(OPERATOR); - policy.setTimeout(TIMEOUT); - params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context) - .executor(executor).policy(policy).startCallback(this::starter).target(TARGET).build(); + .executor(executor).actor(ACTOR).operation(OPERATOR).timeoutSec(TIMEOUT) + .startCallback(this::starter).targetEntity(TARGET).build(); oper = new MyOper(); oper.configure(new TreeMap<>()); @@ -133,6 +130,31 @@ public class OperatorPartialTest { } @Test + public void testGetBlockingExecutor() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + + /* + * Use an operator that doesn't override getBlockingExecutor(). + */ + OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATOR) {}; + oper2.getBlockingExecutor().execute(() -> latch.countDown()); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testDoConfigure() { + oper = spy(new MyOper()); + + oper.configure(config); + verify(oper).configure(config); + + // repeat - SHOULD be run again + oper.configure(config); + verify(oper, times(2)).configure(config); + } + + @Test public void testDoStart() { oper = spy(new MyOper()); @@ -181,7 +203,7 @@ public class OperatorPartialTest { } @Test - public void testStartOperation_testVerifyRunning() { + public void testStartOperation() { verifyRun("testStartOperation", 1, 1, PolicyResult.SUCCESS); } @@ -201,17 +223,13 @@ public class OperatorPartialTest { * Tests startOperation() when the operation has a preprocessor. */ @Test - public void testStartOperationWithPreprocessor_testStartPreprocessor() { + public void testStartOperationWithPreprocessor() { AtomicInteger count = new AtomicInteger(); - // @formatter:off - Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preproc = - oper -> CompletableFuture.supplyAsync(() -> { - count.incrementAndGet(); - oper.setOutcome(PolicyResult.SUCCESS.toString()); - return oper; - }, executor); - // @formatter:on + CompletableFuture<OperationOutcome> preproc = CompletableFuture.supplyAsync(() -> { + count.incrementAndGet(); + return makeSuccess(); + }, executor); oper.setPreProcessor(preproc); @@ -233,7 +251,7 @@ public class OperatorPartialTest { assertNotNull(opstart); assertNotNull(opend); - assertEquals(PolicyResult.SUCCESS.toString(), opend.getOutcome()); + assertEquals(PolicyResult.SUCCESS, opend.getResult()); assertEquals(MAX_PARALLEL_REQUESTS, numStart); assertEquals(MAX_PARALLEL_REQUESTS, oper.getCount()); @@ -245,11 +263,7 @@ public class OperatorPartialTest { */ @Test public void testStartPreprocessorFailure() { - // arrange for the preprocessor to return a failure - oper.setPreProcessor(oper -> { - oper.setOutcome(PolicyResult.FAILURE_GUARD.toString()); - return CompletableFuture.completedFuture(oper); - }); + oper.setPreProcessor(CompletableFuture.completedFuture(makeFailure())); verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD); } @@ -260,9 +274,7 @@ public class OperatorPartialTest { @Test public void testStartPreprocessorException() { // arrange for the preprocessor to throw an exception - oper.setPreProcessor(oper -> { - throw new IllegalStateException(EXPECTED_EXCEPTION); - }); + oper.setPreProcessor(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION))); verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD); } @@ -273,10 +285,7 @@ public class OperatorPartialTest { @Test public void testStartPreprocessorNotRunning() { // arrange for the preprocessor to return success, which will be ignored - oper.setPreProcessor(oper -> { - oper.setOutcome(PolicyResult.SUCCESS.toString()); - return CompletableFuture.completedFuture(oper); - }); + oper.setPreProcessor(CompletableFuture.completedFuture(makeSuccess())); oper.startOperation(params).cancel(false); assertTrue(executor.runAll()); @@ -296,8 +305,7 @@ public class OperatorPartialTest { public void testStartPreprocessorBuilderException() { oper = new MyOper() { @Override - protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture( - ControlLoopOperationParams params) { + protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) { throw new IllegalStateException(EXPECTED_EXCEPTION); } }; @@ -312,51 +320,27 @@ public class OperatorPartialTest { } @Test - public void testDoPreprocessorAsFuture() { - assertNull(oper.doPreprocessorAsFuture(params)); + public void testStartPreprocessorAsync() { + assertNull(oper.startPreprocessorAsync(params)); } @Test - public void testStartOperationOnly_testDoOperationAsFuture() { + public void testStartOperationAsync() { oper.startOperation(params); assertTrue(executor.runAll()); assertEquals(1, oper.getCount()); } - /** - * Tests startOperationOnce() when - * {@link OperatorPartial#doOperationAsFuture(ControlLoopOperationParams)} throws an - * exception. - */ - @Test - public void testStartOperationOnceBuilderException() { - oper = new MyOper() { - @Override - protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture( - ControlLoopOperationParams params, int attempt) { - throw new IllegalStateException(EXPECTED_EXCEPTION); - } - }; - - oper.configure(new TreeMap<>()); - oper.start(); - - assertThatIllegalStateException().isThrownBy(() -> oper.startOperation(params)); - - // should be nothing in the queue - assertEquals(0, executor.getQueueLength()); - } - @Test public void testIsSuccess() { - ControlLoopOperation outcome = new ControlLoopOperation(); + OperationOutcome outcome = new OperationOutcome(); - outcome.setOutcome(PolicyResult.SUCCESS.toString()); + outcome.setResult(PolicyResult.SUCCESS); assertTrue(oper.isSuccess(outcome)); - for (String failure : FAILURE_STRINGS) { - outcome.setOutcome(failure); + for (PolicyResult failure : FAILURE_RESULTS) { + outcome.setResult(failure); assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome)); } } @@ -365,17 +349,17 @@ public class OperatorPartialTest { public void testIsActorFailed() { assertFalse(oper.isActorFailed(null)); - ControlLoopOperation outcome = params.makeOutcome(); + OperationOutcome outcome = params.makeOutcome(); // incorrect outcome - outcome.setOutcome(PolicyResult.SUCCESS.toString()); + outcome.setResult(PolicyResult.SUCCESS); assertFalse(oper.isActorFailed(outcome)); - outcome.setOutcome(PolicyResult.FAILURE_RETRIES.toString()); + outcome.setResult(PolicyResult.FAILURE_RETRIES); assertFalse(oper.isActorFailed(outcome)); // correct outcome - outcome.setOutcome(PolicyResult.FAILURE.toString()); + outcome.setResult(PolicyResult.FAILURE); // incorrect actor outcome.setActor(TARGET); @@ -400,7 +384,12 @@ public class OperatorPartialTest { /* * Use an operator that doesn't override doOperation(). */ - OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATOR) {}; + OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATOR) { + @Override + protected Executor getBlockingExecutor() { + return executor; + } + }; oper2.configure(new TreeMap<>()); oper2.start(); @@ -409,7 +398,7 @@ public class OperatorPartialTest { assertTrue(executor.runAll()); assertNotNull(opend); - assertEquals(PolicyResult.FAILURE_EXCEPTION.toString(), opend.getOutcome()); + assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult()); } @Test @@ -421,36 +410,34 @@ public class OperatorPartialTest { // trigger timeout very quickly oper = new MyOper() { @Override - protected long getTimeOutMillis(Policy policy) { + protected long getTimeOutMillis(Integer timeoutSec) { return 1; } @Override - protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture( - ControlLoopOperationParams params, int attempt) { - - return outcome -> { - ControlLoopOperation outcome2 = params.makeOutcome(); - outcome2.setOutcome(PolicyResult.SUCCESS.toString()); - - /* - * Create an incomplete future that will timeout after the operation's - * timeout. If it fires before the other timer, then it will return a - * SUCCESS outcome. - */ - CompletableFuture<ControlLoopOperation> future = new CompletableFuture<>(); - future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome, - params.getExecutor()); - - return future; - }; + protected CompletableFuture<OperationOutcome> startOperationAsync(ControlLoopOperationParams params, + int attempt, OperationOutcome outcome) { + + OperationOutcome outcome2 = params.makeOutcome(); + outcome2.setResult(PolicyResult.SUCCESS); + + /* + * Create an incomplete future that will timeout after the operation's + * timeout. If it fires before the other timer, then it will return a + * SUCCESS outcome. + */ + CompletableFuture<OperationOutcome> future = new CompletableFuture<>(); + future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome, + params.getExecutor()); + + return future; } }; oper.configure(new TreeMap<>()); oper.start(); - assertEquals(PolicyResult.FAILURE_TIMEOUT.toString(), oper.startOperation(params).get().getOutcome()); + assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.startOperation(params).get().getResult()); } /** @@ -466,40 +453,45 @@ public class OperatorPartialTest { // trigger timeout very quickly oper = new MyOper() { @Override - protected long getTimeOutMillis(Policy policy) { + protected long getTimeOutMillis(Integer timeoutSec) { return 10; } @Override - protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture( - ControlLoopOperationParams params) { - - return outcome -> { - outcome.setOutcome(PolicyResult.SUCCESS.toString()); - - /* - * Create an incomplete future that will timeout after the operation's - * timeout. If it fires before the other timer, then it will return a - * SUCCESS outcome. - */ - CompletableFuture<ControlLoopOperation> future = new CompletableFuture<>(); - future = future.orTimeout(200, TimeUnit.MILLISECONDS).handleAsync((unused1, unused2) -> outcome, - params.getExecutor()); - - return future; + protected Executor getBlockingExecutor() { + return command -> { + Thread thread = new Thread(command); + thread.start(); }; } + + @Override + protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) { + + OperationOutcome outcome = makeSuccess(); + + /* + * Create an incomplete future that will timeout after the operation's + * timeout. If it fires before the other timer, then it will return a + * SUCCESS outcome. + */ + CompletableFuture<OperationOutcome> future = new CompletableFuture<>(); + future = future.orTimeout(200, TimeUnit.MILLISECONDS).handleAsync((unused1, unused2) -> outcome, + params.getExecutor()); + + return future; + } }; oper.configure(new TreeMap<>()); oper.start(); - ControlLoopOperation result = oper.startOperation(params).get(); - assertEquals(PolicyResult.SUCCESS.toString(), result.getOutcome()); + OperationOutcome result = oper.startOperation(params).get(); + assertEquals(PolicyResult.SUCCESS, result.getResult()); assertNotNull(opstart); assertNotNull(opend); - assertEquals(PolicyResult.SUCCESS.toString(), opend.getOutcome()); + assertEquals(PolicyResult.SUCCESS, opend.getResult()); assertEquals(1, numStart); assertEquals(1, oper.getCount()); @@ -510,8 +502,8 @@ public class OperatorPartialTest { * Tests retry functions, when the count is set to zero and retries are exhausted. */ @Test - public void testSetRetryFlag_testRetryOnFailure_ZeroRetries() { - policy.setRetry(0); + public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() { + params = params.toBuilder().retry(0).build(); oper.setMaxFailures(10); verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE); @@ -522,7 +514,7 @@ public class OperatorPartialTest { */ @Test public void testSetRetryFlag_testRetryOnFailure_NullRetries() { - policy.setRetry(null); + params = params.toBuilder().retry(null).build(); oper.setMaxFailures(10); verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE); @@ -534,10 +526,11 @@ public class OperatorPartialTest { @Test public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() { final int maxRetries = 3; - policy.setRetry(maxRetries); + params = params.toBuilder().retry(maxRetries).build(); oper.setMaxFailures(10); - verifyRun("testVerifyRunningWhenNot", maxRetries + 1, maxRetries + 1, PolicyResult.FAILURE_RETRIES); + verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1, + PolicyResult.FAILURE_RETRIES); } /** @@ -545,7 +538,7 @@ public class OperatorPartialTest { */ @Test public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() { - policy.setRetry(10); + params = params.toBuilder().retry(10).build(); final int maxFailures = 3; oper.setMaxFailures(maxFailures); @@ -563,8 +556,8 @@ public class OperatorPartialTest { // arrange to return null from doOperation() oper = new MyOper() { @Override - protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt, - ControlLoopOperation operation) { + protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt, + OperationOutcome operation) { // update counters super.doOperation(params, attempt, operation); @@ -579,96 +572,55 @@ public class OperatorPartialTest { } @Test - public void testGetActorOutcome() { - assertNull(oper.getActorOutcome(null)); + public void testIsSameOperation() { + assertFalse(oper.isSameOperation(null)); - ControlLoopOperation outcome = params.makeOutcome(); - outcome.setOutcome(TARGET); + OperationOutcome outcome = params.makeOutcome(); - // wrong actor - should be null + // wrong actor - should be false outcome.setActor(null); - assertNull(oper.getActorOutcome(outcome)); + assertFalse(oper.isSameOperation(outcome)); outcome.setActor(TARGET); - assertNull(oper.getActorOutcome(outcome)); + assertFalse(oper.isSameOperation(outcome)); outcome.setActor(ACTOR); // wrong operation - should be null outcome.setOperation(null); - assertNull(oper.getActorOutcome(outcome)); + assertFalse(oper.isSameOperation(outcome)); outcome.setOperation(TARGET); - assertNull(oper.getActorOutcome(outcome)); + assertFalse(oper.isSameOperation(outcome)); outcome.setOperation(OPERATOR); - assertEquals(TARGET, oper.getActorOutcome(outcome)); - } - - @Test - public void testOnSuccess() throws Exception { - AtomicInteger count = new AtomicInteger(); - - final Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> nextStep = oper -> { - count.incrementAndGet(); - return CompletableFuture.completedFuture(oper); - }; - - // pass it a null outcome - ControlLoopOperation outcome = oper.onSuccess(params, nextStep).apply(null).get(); - assertNotNull(outcome); - assertEquals(PolicyResult.FAILURE.toString(), outcome.getOutcome()); - assertEquals(0, count.get()); - - // pass it an unpopulated (i.e., failed) outcome - outcome = new ControlLoopOperation(); - assertSame(outcome, oper.onSuccess(params, nextStep).apply(outcome).get()); - assertEquals(0, count.get()); - - // pass it a successful outcome - outcome = params.makeOutcome(); - outcome.setOutcome(PolicyResult.SUCCESS.toString()); - assertSame(outcome, oper.onSuccess(params, nextStep).apply(outcome).get()); - assertEquals(PolicyResult.SUCCESS.toString(), outcome.getOutcome()); - assertEquals(1, count.get()); + assertTrue(oper.isSameOperation(outcome)); } /** - * Tests onSuccess() and handleFailure() when the outcome is a success. + * Tests handleFailure() when the outcome is a success. */ @Test - public void testOnSuccessTrue_testHandleFailureTrue() { - // arrange to return a success from the preprocessor - oper.setPreProcessor(oper -> { - oper.setOutcome(PolicyResult.SUCCESS.toString()); - return CompletableFuture.completedFuture(oper); - }); - - verifyRun("testOnSuccessTrue_testHandleFailureTrue", 1, 1, PolicyResult.SUCCESS); + public void testHandlePreprocessorFailureTrue() { + oper.setPreProcessor(CompletableFuture.completedFuture(makeSuccess())); + verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS); } /** - * Tests onSuccess() and handleFailure() when the outcome is <i>not</i> a success. + * Tests handleFailure() when the outcome is <i>not</i> a success. */ @Test - public void testOnSuccessFalse_testHandleFailureFalse() throws Exception { - // arrange to return a failure from the preprocessor - oper.setPreProcessor(oper -> { - oper.setOutcome(PolicyResult.FAILURE.toString()); - return CompletableFuture.completedFuture(oper); - }); - - verifyRun("testOnSuccessFalse_testHandleFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD); + public void testHandlePreprocessorFailureFalse() throws Exception { + oper.setPreProcessor(CompletableFuture.completedFuture(makeFailure())); + verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD); } /** - * Tests onSuccess() and handleFailure() when the outcome is {@code null}. + * Tests handleFailure() when the outcome is {@code null}. */ @Test - public void testOnSuccessFalse_testHandleFailureNull() throws Exception { + public void testHandlePreprocessorFailureNull() throws Exception { // arrange to return null from the preprocessor - oper.setPreProcessor(oper -> { - return CompletableFuture.completedFuture(null); - }); + oper.setPreProcessor(CompletableFuture.completedFuture(null)); - verifyRun("testOnSuccessFalse_testHandleFailureNull", 1, 0, PolicyResult.FAILURE_GUARD); + verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD); } @Test @@ -688,11 +640,374 @@ public class OperatorPartialTest { } /** - * Tests verifyRunning() when the pipeline is not running. + * Tests both flavors of anyOf(), because one invokes the other. + */ + @Test + public void testAnyOf() throws Exception { + // first task completes, others do not + List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>(); + + final OperationOutcome outcome = params.makeOutcome(); + + tasks.add(CompletableFuture.completedFuture(outcome)); + tasks.add(new CompletableFuture<>()); + tasks.add(new CompletableFuture<>()); + + CompletableFuture<OperationOutcome> result = oper.anyOf(params, tasks); + assertTrue(executor.runAll()); + + assertTrue(result.isDone()); + assertSame(outcome, result.get()); + + // second task completes, others do not + tasks = new LinkedList<>(); + + tasks.add(new CompletableFuture<>()); + tasks.add(CompletableFuture.completedFuture(outcome)); + tasks.add(new CompletableFuture<>()); + + result = oper.anyOf(params, tasks); + assertTrue(executor.runAll()); + + assertTrue(result.isDone()); + assertSame(outcome, result.get()); + + // third task completes, others do not + tasks = new LinkedList<>(); + + tasks.add(new CompletableFuture<>()); + tasks.add(new CompletableFuture<>()); + tasks.add(CompletableFuture.completedFuture(outcome)); + + result = oper.anyOf(params, tasks); + assertTrue(executor.runAll()); + + assertTrue(result.isDone()); + assertSame(outcome, result.get()); + } + + /** + * Tests both flavors of allOf(), because one invokes the other. + */ + @Test + public void testAllOf() throws Exception { + List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>(); + + final OperationOutcome outcome = params.makeOutcome(); + + CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>(); + CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>(); + CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>(); + + tasks.add(future1); + tasks.add(future2); + tasks.add(future3); + + CompletableFuture<OperationOutcome> result = oper.allOf(params, tasks); + + assertTrue(executor.runAll()); + assertFalse(result.isDone()); + future1.complete(outcome); + + // complete 3 before 2 + assertTrue(executor.runAll()); + assertFalse(result.isDone()); + future3.complete(outcome); + + assertTrue(executor.runAll()); + assertFalse(result.isDone()); + future2.complete(outcome); + + // all of them are now done + assertTrue(executor.runAll()); + assertTrue(result.isDone()); + assertSame(outcome, result.get()); + } + + @Test + public void testCombineOutcomes() throws Exception { + // only one outcome + verifyOutcomes(0, PolicyResult.SUCCESS); + verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION); + + // maximum is in different positions + verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD); + verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD); + verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE); + + // null outcome + final List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>(); + tasks.add(CompletableFuture.completedFuture(null)); + CompletableFuture<OperationOutcome> result = oper.allOf(params, tasks); + + assertTrue(executor.runAll()); + assertTrue(result.isDone()); + assertNull(result.get()); + + // one throws an exception during execution + IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION); + + tasks.clear(); + tasks.add(CompletableFuture.completedFuture(params.makeOutcome())); + tasks.add(CompletableFuture.failedFuture(except)); + tasks.add(CompletableFuture.completedFuture(params.makeOutcome())); + result = oper.allOf(params, tasks); + + assertTrue(executor.runAll()); + assertTrue(result.isCompletedExceptionally()); + result.whenComplete((unused, thrown) -> assertSame(except, thrown)); + } + + private void verifyOutcomes(int expected, PolicyResult... results) throws Exception { + List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>(); + + + OperationOutcome expectedOutcome = null; + + for (int count = 0; count < results.length; ++count) { + OperationOutcome outcome = params.makeOutcome(); + outcome.setResult(results[count]); + tasks.add(CompletableFuture.completedFuture(outcome)); + + if (count == expected) { + expectedOutcome = outcome; + } + } + + CompletableFuture<OperationOutcome> result = oper.allOf(params, tasks); + + assertTrue(executor.runAll()); + assertTrue(result.isDone()); + assertSame(expectedOutcome, result.get()); + } + + private Function<OperationOutcome, CompletableFuture<OperationOutcome>> makeTask( + final OperationOutcome taskOutcome) { + + return outcome -> CompletableFuture.completedFuture(taskOutcome); + } + + @Test + public void testDetmPriority() { + assertEquals(1, oper.detmPriority(null)); + + OperationOutcome outcome = params.makeOutcome(); + + Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2, + PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5, + PolicyResult.FAILURE_EXCEPTION, 6); + + for (Entry<PolicyResult, Integer> ent : map.entrySet()) { + outcome.setResult(ent.getKey()); + assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome)); + } + } + + /** + * Tests doTask(Future) when the controller is not running. + */ + @Test + public void testDoTaskFutureNotRunning() throws Exception { + CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>(); + + PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + controller.complete(params.makeOutcome()); + + CompletableFuture<OperationOutcome> future = + oper.doTask(params, controller, false, params.makeOutcome(), taskFuture); + assertFalse(future.isDone()); + assertTrue(executor.runAll()); + + // should not have run the task + assertFalse(future.isDone()); + + // should have canceled the task future + assertTrue(taskFuture.isCancelled()); + } + + /** + * Tests doTask(Future) when the previous outcome was successful. + */ + @Test + public void testDoTaskFutureSuccess() throws Exception { + CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>(); + final OperationOutcome taskOutcome = params.makeOutcome(); + + PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + CompletableFuture<OperationOutcome> future = + oper.doTask(params, controller, true, params.makeOutcome(), taskFuture); + + taskFuture.complete(taskOutcome); + assertTrue(executor.runAll()); + + assertTrue(future.isDone()); + assertSame(taskOutcome, future.get()); + + // controller should not be done yet + assertFalse(controller.isDone()); + } + + /** + * Tests doTask(Future) when the previous outcome was failed. + */ + @Test + public void testDoTaskFutureFailure() throws Exception { + CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>(); + final OperationOutcome failedOutcome = params.makeOutcome(); + failedOutcome.setResult(PolicyResult.FAILURE); + + PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, true, failedOutcome, taskFuture); + assertFalse(future.isDone()); + assertTrue(executor.runAll()); + + // should not have run the task + assertFalse(future.isDone()); + + // should have canceled the task future + assertTrue(taskFuture.isCancelled()); + + // controller SHOULD be done now + assertTrue(controller.isDone()); + assertSame(failedOutcome, controller.get()); + } + + /** + * Tests doTask(Future) when the previous outcome was failed, but not checking + * success. + */ + @Test + public void testDoTaskFutureUncheckedFailure() throws Exception { + CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>(); + final OperationOutcome failedOutcome = params.makeOutcome(); + failedOutcome.setResult(PolicyResult.FAILURE); + + PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, false, failedOutcome, taskFuture); + assertFalse(future.isDone()); + + // complete the task + OperationOutcome taskOutcome = params.makeOutcome(); + taskFuture.complete(taskOutcome); + + assertTrue(executor.runAll()); + + // should have run the task + assertTrue(future.isDone()); + + assertTrue(future.isDone()); + assertSame(taskOutcome, future.get()); + + // controller should not be done yet + assertFalse(controller.isDone()); + } + + /** + * Tests doTask(Function) when the controller is not running. + */ + @Test + public void testDoTaskFunctionNotRunning() throws Exception { + AtomicBoolean invoked = new AtomicBoolean(); + + Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> { + invoked.set(true); + return CompletableFuture.completedFuture(params.makeOutcome()); + }; + + PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + controller.complete(params.makeOutcome()); + + CompletableFuture<OperationOutcome> future = + oper.doTask(params, controller, false, task).apply(params.makeOutcome()); + assertFalse(future.isDone()); + assertTrue(executor.runAll()); + + // should not have run the task + assertFalse(future.isDone()); + + // should not have even invoked the task + assertFalse(invoked.get()); + } + + /** + * Tests doTask(Function) when the previous outcome was successful. + */ + @Test + public void testDoTaskFunctionSuccess() throws Exception { + final OperationOutcome taskOutcome = params.makeOutcome(); + + final OperationOutcome failedOutcome = params.makeOutcome(); + + Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome); + + PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, true, task).apply(failedOutcome); + + assertTrue(future.isDone()); + assertSame(taskOutcome, future.get()); + + // controller should not be done yet + assertFalse(controller.isDone()); + } + + /** + * Tests doTask(Function) when the previous outcome was failed. + */ + @Test + public void testDoTaskFunctionFailure() throws Exception { + final OperationOutcome failedOutcome = params.makeOutcome(); + failedOutcome.setResult(PolicyResult.FAILURE); + + AtomicBoolean invoked = new AtomicBoolean(); + + Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> { + invoked.set(true); + return CompletableFuture.completedFuture(params.makeOutcome()); + }; + + PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, true, task).apply(failedOutcome); + assertFalse(future.isDone()); + assertTrue(executor.runAll()); + + // should not have run the task + assertFalse(future.isDone()); + + // should not have even invoked the task + assertFalse(invoked.get()); + + // controller should have the failed task + assertTrue(controller.isDone()); + assertSame(failedOutcome, controller.get()); + } + + /** + * Tests doTask(Function) when the previous outcome was failed, but not checking + * success. */ @Test - public void testVerifyRunningWhenNot() { - verifyRun("testVerifyRunningWhenNot", 0, 0, PolicyResult.SUCCESS, future -> future.cancel(false)); + public void testDoTaskFunctionUncheckedFailure() throws Exception { + final OperationOutcome taskOutcome = params.makeOutcome(); + + final OperationOutcome failedOutcome = params.makeOutcome(); + failedOutcome.setResult(PolicyResult.FAILURE); + + Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome); + + PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, false, task).apply(failedOutcome); + + assertTrue(future.isDone()); + assertSame(taskOutcome, future.get()); + + // controller should not be done yet + assertFalse(controller.isDone()); } /** @@ -700,7 +1015,7 @@ public class OperatorPartialTest { */ @Test public void testCallbackStartedNotRunning() { - AtomicReference<Future<ControlLoopOperation>> future = new AtomicReference<>(); + AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>(); /* * arrange to stop the controller when the start-callback is invoked, but capture @@ -723,7 +1038,7 @@ public class OperatorPartialTest { */ @Test public void testCallbackCompletedNotRunning() { - AtomicReference<Future<ControlLoopOperation>> future = new AtomicReference<>(); + AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>(); // arrange to stop the controller when the start-callback is invoked params = params.toBuilder().startCallback(oper -> { @@ -739,36 +1054,36 @@ public class OperatorPartialTest { } @Test - public void testSetOutcomeControlLoopOperationThrowable() { + public void testSetOutcomeControlLoopOperationOutcomeThrowable() { final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION)); - ControlLoopOperation outcome; + OperationOutcome outcome; - outcome = new ControlLoopOperation(); + outcome = new OperationOutcome(); oper.setOutcome(params, outcome, timex); assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage()); - assertEquals(PolicyResult.FAILURE_TIMEOUT.toString(), outcome.getOutcome()); + assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult()); - outcome = new ControlLoopOperation(); - oper.setOutcome(params, outcome, new IllegalStateException()); + outcome = new OperationOutcome(); + oper.setOutcome(params, outcome, new IllegalStateException(EXPECTED_EXCEPTION)); assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage()); - assertEquals(PolicyResult.FAILURE_EXCEPTION.toString(), outcome.getOutcome()); + assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult()); } @Test - public void testSetOutcomeControlLoopOperationPolicyResult() { - ControlLoopOperation outcome; + public void testSetOutcomeControlLoopOperationOutcomePolicyResult() { + OperationOutcome outcome; - outcome = new ControlLoopOperation(); + outcome = new OperationOutcome(); oper.setOutcome(params, outcome, PolicyResult.SUCCESS); assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage()); - assertEquals(PolicyResult.SUCCESS.toString(), outcome.getOutcome()); + assertEquals(PolicyResult.SUCCESS, outcome.getResult()); for (PolicyResult result : FAILURE_RESULTS) { - outcome = new ControlLoopOperation(); + outcome = new OperationOutcome(); oper.setOutcome(params, outcome, result); assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage()); - assertEquals(result.toString(), result.toString(), outcome.getOutcome()); + assertEquals(result.toString(), result, outcome.getResult()); } } @@ -776,7 +1091,7 @@ public class OperatorPartialTest { public void testIsTimeout() { final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION); - assertFalse(oper.isTimeout(new IllegalStateException())); + assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION))); assertFalse(oper.isTimeout(new IllegalStateException(timex))); assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex)))); assertFalse(oper.isTimeout(new CompletionException(null))); @@ -788,19 +1103,19 @@ public class OperatorPartialTest { @Test public void testGetTimeOutMillis() { - assertEquals(TIMEOUT * 1000, oper.getTimeOutMillis(policy)); + assertEquals(TIMEOUT * 1000, oper.getTimeOutMillis(params.getTimeoutSec())); - policy.setTimeout(null); - assertEquals(0, oper.getTimeOutMillis(policy)); + params = params.toBuilder().timeoutSec(null).build(); + assertEquals(0, oper.getTimeOutMillis(params.getTimeoutSec())); } - private void starter(ControlLoopOperation oper) { + private void starter(OperationOutcome oper) { ++numStart; tstart = oper.getStart(); opstart = oper; } - private void completer(ControlLoopOperation oper) { + private void completer(OperationOutcome oper) { ++numEnd; opend = oper; } @@ -816,21 +1131,18 @@ public class OperatorPartialTest { }; } - /** - * Verifies a run. - * - * @param testName test name - * @param expectedCallbacks number of callbacks expected - * @param expectedOperations number of operation invocations expected - * @param expectedResult expected outcome - */ - private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, - PolicyResult expectedResult) { + private OperationOutcome makeSuccess() { + OperationOutcome outcome = params.makeOutcome(); + outcome.setResult(PolicyResult.SUCCESS); - String expectedSubRequestId = - (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations)); + return outcome; + } - verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop()); + private OperationOutcome makeFailure() { + OperationOutcome outcome = params.makeOutcome(); + outcome.setResult(PolicyResult.FAILURE); + + return outcome; } /** @@ -840,17 +1152,14 @@ public class OperatorPartialTest { * @param expectedCallbacks number of callbacks expected * @param expectedOperations number of operation invocations expected * @param expectedResult expected outcome - * @param manipulator function to modify the future returned by - * {@link OperatorPartial#startOperation(ControlLoopOperationParams)} before - * the tasks in the executor are run */ - private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult, - Consumer<CompletableFuture<ControlLoopOperation>> manipulator) { + private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, + PolicyResult expectedResult) { String expectedSubRequestId = (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations)); - verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, manipulator); + verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop()); } /** @@ -866,9 +1175,9 @@ public class OperatorPartialTest { * the tasks in the executor are run */ private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult, - String expectedSubRequestId, Consumer<CompletableFuture<ControlLoopOperation>> manipulator) { + String expectedSubRequestId, Consumer<CompletableFuture<OperationOutcome>> manipulator) { - CompletableFuture<ControlLoopOperation> future = oper.startOperation(params); + CompletableFuture<OperationOutcome> future = oper.startOperation(params); manipulator.accept(future); @@ -880,7 +1189,7 @@ public class OperatorPartialTest { if (expectedCallbacks > 0) { assertNotNull(testName, opstart); assertNotNull(testName, opend); - assertEquals(testName, expectedResult.toString(), opend.getOutcome()); + assertEquals(testName, expectedResult, opend.getResult()); assertSame(testName, tstart, opstart.getStart()); assertSame(testName, tstart, opend.getStart()); @@ -901,7 +1210,7 @@ public class OperatorPartialTest { assertEquals(testName, expectedOperations, oper.getCount()); } - private static class MyOper extends OperatorPartial { + private class MyOper extends OperatorPartial { @Getter private int count = 0; @@ -912,15 +1221,15 @@ public class OperatorPartialTest { private int maxFailures = 0; @Setter - private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preProcessor; + private CompletableFuture<OperationOutcome> preProcessor; public MyOper() { super(ACTOR, OPERATOR); } @Override - protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt, - ControlLoopOperation operation) { + protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt, + OperationOutcome operation) { ++count; if (genException) { throw new IllegalStateException(EXPECTED_EXCEPTION); @@ -929,19 +1238,22 @@ public class OperatorPartialTest { operation.setSubRequestId(String.valueOf(attempt)); if (count > maxFailures) { - operation.setOutcome(PolicyResult.SUCCESS.toString()); + operation.setResult(PolicyResult.SUCCESS); } else { - operation.setOutcome(PolicyResult.FAILURE.toString()); + operation.setResult(PolicyResult.FAILURE); } return operation; } @Override - protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture( - ControlLoopOperationParams params) { + protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) { + return (preProcessor != null ? preProcessor : super.startPreprocessorAsync(params)); + } - return (preProcessor != null ? preProcessor : super.doPreprocessorAsFuture(params)); + @Override + protected Executor getBlockingExecutor() { + return executor; } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParamsTest.java index 0c8e77d38..9dd19d548 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParamsTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParamsTest.java @@ -35,6 +35,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.Map; +import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -47,20 +49,23 @@ import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.onap.policy.common.parameters.BeanValidationResult; -import org.onap.policy.controlloop.ControlLoopOperation; import org.onap.policy.controlloop.VirtualControlLoopEvent; import org.onap.policy.controlloop.actorserviceprovider.ActorService; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; import org.onap.policy.controlloop.actorserviceprovider.Operator; import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams.ControlLoopOperationParamsBuilder; import org.onap.policy.controlloop.actorserviceprovider.spi.Actor; -import org.onap.policy.controlloop.policy.Policy; +import org.onap.policy.controlloop.policy.Target; public class ControlLoopOperationParamsTest { private static final String EXPECTED_EXCEPTION = "expected exception"; private static final String ACTOR = "my-actor"; private static final String OPERATION = "my-operation"; - private static final String TARGET = "my-target"; + private static final Target TARGET = new Target(); + private static final String TARGET_ENTITY = "my-target"; + private static final Integer RETRY = 3; + private static final Integer TIMEOUT = 100; private static final UUID REQ_ID = UUID.randomUUID(); @Mock @@ -70,7 +75,7 @@ public class ControlLoopOperationParamsTest { private ActorService actorService; @Mock - private Consumer<ControlLoopOperation> completer; + private Consumer<OperationOutcome> completer; @Mock private ControlLoopEventContext context; @@ -82,19 +87,18 @@ public class ControlLoopOperationParamsTest { private Executor executor; @Mock - private CompletableFuture<ControlLoopOperation> operation; + private CompletableFuture<OperationOutcome> operation; @Mock private Operator operator; @Mock - private Policy policy; + private Consumer<OperationOutcome> starter; - @Mock - private Consumer<ControlLoopOperation> starter; + private Map<String, String> payload; private ControlLoopOperationParams params; - private ControlLoopOperation outcome; + private OperationOutcome outcome; /** @@ -112,12 +116,12 @@ public class ControlLoopOperationParamsTest { when(context.getEvent()).thenReturn(event); - when(policy.getActor()).thenReturn(ACTOR); - when(policy.getRecipe()).thenReturn(OPERATION); + payload = new TreeMap<>(); params = ControlLoopOperationParams.builder().actorService(actorService).completeCallback(completer) - .context(context).executor(executor).policy(policy).startCallback(starter).target(TARGET) - .build(); + .context(context).executor(executor).actor(ACTOR).operation(OPERATION).payload(payload) + .retry(RETRY).target(TARGET).targetEntity(TARGET_ENTITY).timeoutSec(TIMEOUT) + .startCallback(starter).build(); outcome = params.makeOutcome(); } @@ -130,30 +134,6 @@ public class ControlLoopOperationParamsTest { } @Test - public void testGetActor() { - assertEquals(ACTOR, params.getActor()); - - // try with null policy - assertEquals(ControlLoopOperationParams.UNKNOWN, params.toBuilder().policy(null).build().getActor()); - - // try with null name in the policy - when(policy.getActor()).thenReturn(null); - assertEquals(ControlLoopOperationParams.UNKNOWN, params.getActor()); - } - - @Test - public void testGetOperation() { - assertEquals(OPERATION, params.getOperation()); - - // try with null policy - assertEquals(ControlLoopOperationParams.UNKNOWN, params.toBuilder().policy(null).build().getOperation()); - - // try with null name in the policy - when(policy.getRecipe()).thenReturn(null); - assertEquals(ControlLoopOperationParams.UNKNOWN, params.getOperation()); - } - - @Test public void testGetRequestId() { assertSame(REQ_ID, params.getRequestId()); @@ -170,20 +150,14 @@ public class ControlLoopOperationParamsTest { assertEquals(ACTOR, outcome.getActor()); assertEquals(OPERATION, outcome.getOperation()); checkRemainingFields("with actor"); - - // try again with a null policy - outcome = params.toBuilder().policy(null).build().makeOutcome(); - assertEquals(ControlLoopOperationParams.UNKNOWN, outcome.getActor()); - assertEquals(ControlLoopOperationParams.UNKNOWN, outcome.getOperation()); - checkRemainingFields("unknown actor"); } protected void checkRemainingFields(String testName) { - assertEquals(testName, TARGET, outcome.getTarget()); - assertNotNull(testName, outcome.getStart()); + assertEquals(testName, TARGET_ENTITY, outcome.getTarget()); + assertNull(testName, outcome.getStart()); assertNull(testName, outcome.getEnd()); assertNull(testName, outcome.getSubRequestId()); - assertNull(testName, outcome.getOutcome()); + assertNotNull(testName, outcome.getResult()); assertNull(testName, outcome.getMessage()); } @@ -239,21 +213,23 @@ public class ControlLoopOperationParamsTest { @Test public void testValidateFields() { + testValidate("actor", "null", bldr -> bldr.actor(null)); testValidate("actorService", "null", bldr -> bldr.actorService(null)); testValidate("context", "null", bldr -> bldr.context(null)); testValidate("executor", "null", bldr -> bldr.executor(null)); - testValidate("policy", "null", bldr -> bldr.policy(null)); - testValidate("target", "null", bldr -> bldr.target(null)); + testValidate("operation", "null", bldr -> bldr.operation(null)); + testValidate("target", "null", bldr -> bldr.targetEntity(null)); // check edge cases assertTrue(params.toBuilder().build().validate().isValid()); // these can be null - assertTrue(params.toBuilder().startCallback(null).completeCallback(null).build().validate().isValid()); + assertTrue(params.toBuilder().payload(null).retry(null).target(null).timeoutSec(null).startCallback(null) + .completeCallback(null).build().validate().isValid()); // test with minimal fields - assertTrue(ControlLoopOperationParams.builder().actorService(actorService).context(context).policy(policy) - .target(TARGET).build().validate().isValid()); + assertTrue(ControlLoopOperationParams.builder().actorService(actorService).context(context).actor(ACTOR) + .operation(OPERATION).targetEntity(TARGET_ENTITY).build().validate().isValid()); } private void testValidate(String fieldName, String expected, @@ -275,7 +251,12 @@ public class ControlLoopOperationParamsTest { } @Test - public void testActorService() { + public void testGetActor() { + assertSame(ACTOR, params.getActor()); + } + + @Test + public void testGetActorService() { assertSame(actorService, params.getActorService()); } @@ -293,8 +274,43 @@ public class ControlLoopOperationParamsTest { } @Test - public void testGetPolicy() { - assertSame(policy, params.getPolicy()); + public void testGetOperation() { + assertSame(OPERATION, params.getOperation()); + } + + @Test + public void testGetPayload() { + assertSame(payload, params.getPayload()); + + // should be null when unspecified + assertNull(ControlLoopOperationParams.builder().build().getPayload()); + } + + @Test + public void testGetRetry() { + assertSame(RETRY, params.getRetry()); + + // should be null when unspecified + assertNull(ControlLoopOperationParams.builder().build().getRetry()); + } + + @Test + public void testTarget() { + assertSame(TARGET, params.getTarget()); + + // should be null when unspecified + assertNull(ControlLoopOperationParams.builder().build().getTarget()); + } + + @Test + public void testGetTimeoutSec() { + assertSame(TIMEOUT, params.getTimeoutSec()); + + // should be 300 when unspecified + assertEquals(Integer.valueOf(300), ControlLoopOperationParams.builder().build().getTimeoutSec()); + + // null should be ok too + assertNull(ControlLoopOperationParams.builder().timeoutSec(null).build().getTimeoutSec()); } @Test @@ -308,7 +324,7 @@ public class ControlLoopOperationParamsTest { } @Test - public void testGetTarget() { - assertEquals(TARGET, params.getTarget()); + public void testGetTargetEntity() { + assertEquals(TARGET_ENTITY, params.getTargetEntity()); } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParamsTest.java index 1763388f2..6c1f538ec 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParamsTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParamsTest.java @@ -90,6 +90,8 @@ public class HttpActorParamsTest { @Test public void testValidate() { + assertTrue(params.validate(CONTAINER).isValid()); + testValidateField("clientName", "null", params2 -> params2.setClientName(null)); testValidateField("path", "null", params2 -> params2.setPath(null)); testValidateField("timeoutSec", "minimum", params2 -> params2.setTimeoutSec(-1)); diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParamsTest.java index 829c480d1..6cf7328ca 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParamsTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParamsTest.java @@ -47,6 +47,8 @@ public class HttpParamsTest { @Test public void testValidate() { + assertTrue(params.validate(CONTAINER).isValid()); + testValidateField("clientName", "null", bldr -> bldr.clientName(null)); testValidateField("path", "null", bldr -> bldr.path(null)); testValidateField("timeoutSec", "minimum", bldr -> bldr.timeoutSec(-1)); diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFutureTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFutureTest.java index b421c1ce2..a6b11ef65 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFutureTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFutureTest.java @@ -23,21 +23,27 @@ package org.onap.policy.controlloop.actorserviceprovider.pipeline; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Function; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -58,9 +64,10 @@ public class PipelineControllerFutureTest { private Future<String> future2; @Mock - private CompletableFuture<String> compFuture; + private Executor executor; + private CompletableFuture<String> compFuture; private PipelineControllerFuture<String> controller; @@ -72,6 +79,8 @@ public class PipelineControllerFutureTest { public void setUp() { MockitoAnnotations.initMocks(this); + compFuture = spy(new CompletableFuture<>()); + controller = new PipelineControllerFuture<>(); controller.add(runnable1); @@ -89,10 +98,7 @@ public class PipelineControllerFutureTest { assertTrue(controller.isCancelled()); assertFalse(controller.isRunning()); - verify(runnable1).run(); - verify(runnable2).run(); - verify(future1).cancel(anyBoolean()); - verify(future2).cancel(anyBoolean()); + verifyStopped(); // re-invoke; nothing should change assertTrue(controller.cancel(true)); @@ -100,10 +106,155 @@ public class PipelineControllerFutureTest { assertTrue(controller.isCancelled()); assertFalse(controller.isRunning()); - verify(runnable1).run(); - verify(runnable2).run(); - verify(future1).cancel(anyBoolean()); - verify(future2).cancel(anyBoolean()); + verifyStopped(); + } + + @Test + public void testCompleteT() throws Exception { + assertTrue(controller.complete(TEXT)); + assertEquals(TEXT, controller.get()); + + verifyStopped(); + + // repeat - disallowed + assertFalse(controller.complete(TEXT)); + } + + @Test + public void testCompleteExceptionallyThrowable() { + assertTrue(controller.completeExceptionally(EXPECTED_EXCEPTION)); + assertThatThrownBy(() -> controller.get()).hasCause(EXPECTED_EXCEPTION); + + verifyStopped(); + + // repeat - disallowed + assertFalse(controller.completeExceptionally(EXPECTED_EXCEPTION)); + } + + @Test + public void testCompleteAsyncSupplierOfQextendsTExecutor() throws Exception { + CompletableFuture<String> future = controller.completeAsync(() -> TEXT, executor); + + // haven't stopped anything yet + assertFalse(future.isDone()); + verify(runnable1, never()).run(); + + // get the operation and run it + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(executor).execute(captor.capture()); + captor.getValue().run(); + + // should be done now + assertTrue(future.isDone()); + + assertEquals(TEXT, future.get()); + + verifyStopped(); + } + + /** + * Tests completeAsync(executor) when canceled before execution. + */ + @Test + public void testCompleteAsyncSupplierOfQextendsTExecutorCanceled() throws Exception { + CompletableFuture<String> future = controller.completeAsync(() -> TEXT, executor); + + assertTrue(future.cancel(false)); + + verifyStopped(); + + assertTrue(future.isDone()); + + assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class); + } + + @Test + public void testCompleteAsyncSupplierOfQextendsT() throws Exception { + CompletableFuture<String> future = controller.completeAsync(() -> TEXT); + assertEquals(TEXT, future.get()); + + verifyStopped(); + } + + /** + * Tests completeAsync() when canceled. + */ + @Test + public void testCompleteAsyncSupplierOfQextendsTCanceled() throws Exception { + CountDownLatch canceled = new CountDownLatch(1); + + // run async, but await until canceled + CompletableFuture<String> future = controller.completeAsync(() -> { + try { + canceled.await(); + } catch (InterruptedException e) { + // do nothing + } + + return TEXT; + }); + + assertTrue(future.cancel(false)); + + // let the future run now + canceled.countDown(); + + verifyStopped(); + + assertTrue(future.isDone()); + + assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class); + } + + @Test + public void testCompleteOnTimeoutTLongTimeUnit() throws Exception { + CountDownLatch stopped = new CountDownLatch(1); + controller.add(() -> stopped.countDown()); + + CompletableFuture<String> future = controller.completeOnTimeout(TEXT, 1, TimeUnit.MILLISECONDS); + + assertEquals(TEXT, future.get()); + + /* + * Must use latch instead of verifyStopped(), because the runnables may be + * executed asynchronously. + */ + assertTrue(stopped.await(5, TimeUnit.SECONDS)); + } + + /** + * Tests completeOnTimeout() when completed before the timeout. + */ + @Test + public void testCompleteOnTimeoutTLongTimeUnitNoTimeout() throws Exception { + CompletableFuture<String> future = controller.completeOnTimeout("timed out", 5, TimeUnit.SECONDS); + controller.complete(TEXT); + + assertEquals(TEXT, future.get()); + + verifyStopped(); + } + + /** + * Tests completeOnTimeout() when canceled before the timeout. + */ + @Test + public void testCompleteOnTimeoutTLongTimeUnitCanceled() { + CompletableFuture<String> future = controller.completeOnTimeout(TEXT, 5, TimeUnit.SECONDS); + assertTrue(future.cancel(true)); + + assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class); + + verifyStopped(); + } + + @Test + public void testNewIncompleteFuture() { + PipelineControllerFuture<String> future = controller.newIncompleteFuture(); + assertNotNull(future); + assertTrue(future instanceof PipelineControllerFuture); + assertNotSame(controller, future); + assertFalse(future.isDone()); } @Test @@ -208,22 +359,81 @@ public class PipelineControllerFutureTest { verify(future2).cancel(anyBoolean()); } + /** + * Tests both wrap() methods. + */ @Test - public void testAddFunction() { - AtomicReference<String> value = new AtomicReference<>(); + public void testWrap() throws Exception { + controller = spy(controller); - Function<String, CompletableFuture<String>> func = controller.add(input -> { - value.set(input); + CompletableFuture<String> future = controller.wrap(compFuture); + verify(controller, never()).remove(compFuture); + + compFuture.complete(TEXT); + assertEquals(TEXT, future.get()); + + verify(controller).remove(compFuture); + } + + /** + * Tests wrap(), when the controller is not running. + */ + @Test + public void testWrapNotRunning() throws Exception { + controller.cancel(false); + controller = spy(controller); + + assertFalse(controller.wrap(compFuture).isDone()); + verify(controller, never()).add(compFuture); + verify(controller, never()).remove(compFuture); + + verify(compFuture).cancel(anyBoolean()); + } + + /** + * Tests wrap(), when the future throws an exception. + */ + @Test + public void testWrapException() throws Exception { + controller = spy(controller); + + CompletableFuture<String> future = controller.wrap(compFuture); + verify(controller, never()).remove(compFuture); + + compFuture.completeExceptionally(EXPECTED_EXCEPTION); + assertThatThrownBy(() -> future.get()).hasCause(EXPECTED_EXCEPTION); + + verify(controller).remove(compFuture); + } + + @Test + public void testWrapFunction() throws Exception { + + Function<String, CompletableFuture<String>> func = controller.wrap(input -> { + compFuture.complete(input); return compFuture; }); - assertSame(compFuture, func.apply(TEXT)); - assertEquals(TEXT, value.get()); + CompletableFuture<String> future = func.apply(TEXT); + assertTrue(compFuture.isDone()); - verify(compFuture, never()).cancel(anyBoolean()); + assertEquals(TEXT, future.get()); // should not have completed the controller assertFalse(controller.isDone()); + } + + /** + * Tests add(Function) when the controller is canceled after the future is added. + */ + @Test + public void testWrapFunctionCancel() throws Exception { + Function<String, CompletableFuture<String>> func = controller.wrap(input -> compFuture); + + CompletableFuture<String> future = func.apply(TEXT); + assertFalse(future.isDone()); + + assertFalse(compFuture.isDone()); // cancel - should propagate controller.cancel(false); @@ -235,10 +445,10 @@ public class PipelineControllerFutureTest { * Tests add(Function) when the controller is not running. */ @Test - public void testAddFunctionNotRunning() { + public void testWrapFunctionNotRunning() { AtomicReference<String> value = new AtomicReference<>(); - Function<String, CompletableFuture<String>> func = controller.add(input -> { + Function<String, CompletableFuture<String>> func = controller.wrap(input -> { value.set(input); return compFuture; }); @@ -251,4 +461,11 @@ public class PipelineControllerFutureTest { assertNull(value.get()); } + + private void verifyStopped() { + verify(runnable1).run(); + verify(runnable2).run(); + verify(future1).cancel(anyBoolean()); + verify(future2).cancel(anyBoolean()); + } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml b/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml index f8a1e5112..c7fe46e47 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml +++ b/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml @@ -35,8 +35,8 @@ <appender-ref ref="STDOUT" /> </root> - <!-- this is just an example --> - <logger name="ch.qos.logback.classic" level="off" additivity="false"> + <!-- this is required for UtilTest --> + <logger name="org.onap.policy.controlloop.actorserviceprovider.Util" level="info" additivity="false"> <appender-ref ref="STDOUT" /> </logger> </configuration> |