summaryrefslogtreecommitdiffstats
path: root/models-interactions/model-actors/actorServiceProvider/src
diff options
context:
space:
mode:
Diffstat (limited to 'models-interactions/model-actors/actorServiceProvider/src')
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java7
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java119
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManager.java84
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcome.java116
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java3
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java81
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java23
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java50
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActor.java58
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java84
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java640
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java108
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java2
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java119
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandlerTest.java172
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManagerTest.java89
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcomeTest.java137
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/UtilTest.java145
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java27
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImplTest.java15
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActorTest.java81
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperatorTest.java104
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartialTest.java792
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParamsTest.java128
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParamsTest.java2
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParamsTest.java2
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFutureTest.java255
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml4
28 files changed, 2727 insertions, 720 deletions
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java
index 13f09b1ad..2886b1feb 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java
@@ -63,7 +63,6 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> {
return newActor;
}
- // TODO: should this throw an exception?
logger.warn("duplicate actor names for {}: {}, ignoring {}", name,
existingActor.getClass().getSimpleName(), newActor.getClass().getSimpleName());
return existingActor;
@@ -158,7 +157,7 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> {
for (Actor actor : name2actor.values()) {
if (actor.isConfigured()) {
- Util.logException(actor::start, "failed to start actor {}", actor.getName());
+ Util.runFunction(actor::start, "failed to start actor {}", actor.getName());
} else {
logger.warn("not starting unconfigured actor {}", actor.getName());
@@ -170,7 +169,7 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> {
protected void doStop() {
logger.info("stopping actors");
name2actor.values()
- .forEach(actor -> Util.logException(actor::stop, "failed to stop actor {}", actor.getName()));
+ .forEach(actor -> Util.runFunction(actor::stop, "failed to stop actor {}", actor.getName()));
}
@Override
@@ -179,7 +178,7 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> {
// @formatter:off
name2actor.values().forEach(
- actor -> Util.logException(actor::shutdown, "failed to shutdown actor {}", actor.getName()));
+ actor -> Util.runFunction(actor::shutdown, "failed to shutdown actor {}", actor.getName()));
// @formatter:on
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java
new file mode 100644
index 000000000..d78403809
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java
@@ -0,0 +1,119 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import javax.ws.rs.client.InvocationCallback;
+import lombok.AccessLevel;
+import lombok.Getter;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handler for a <i>single</i> asynchronous response.
+ *
+ * @param <T> response type
+ */
+@Getter
+public abstract class AsyncResponseHandler<T> implements InvocationCallback<T> {
+
+ private static final Logger logger = LoggerFactory.getLogger(AsyncResponseHandler.class);
+
+ @Getter(AccessLevel.NONE)
+ private final PipelineControllerFuture<OperationOutcome> result = new PipelineControllerFuture<>();
+ private final ControlLoopOperationParams params;
+ private final OperationOutcome outcome;
+
+ /**
+ * Constructs the object.
+ *
+ * @param params operation parameters
+ * @param outcome outcome to be populated based on the response
+ */
+ public AsyncResponseHandler(ControlLoopOperationParams params, OperationOutcome outcome) {
+ this.params = params;
+ this.outcome = outcome;
+ }
+
+ /**
+ * Handles the given future, arranging to cancel it when the response is received.
+ *
+ * @param future future to be handled
+ * @return a future to be used to cancel or wait for the response
+ */
+ public CompletableFuture<OperationOutcome> handle(Future<T> future) {
+ result.add(future);
+ return result;
+ }
+
+ /**
+ * Invokes {@link #doComplete()} and then completes "this" with the returned value.
+ */
+ @Override
+ public void completed(T rawResponse) {
+ try {
+ logger.trace("{}.{}: response completed for {}", params.getActor(), params.getOperation(),
+ params.getRequestId());
+ result.complete(doComplete(rawResponse));
+
+ } catch (RuntimeException e) {
+ logger.trace("{}.{}: response handler threw an exception for {}", params.getActor(), params.getOperation(),
+ params.getRequestId());
+ result.completeExceptionally(e);
+ }
+ }
+
+ /**
+ * Invokes {@link #doFailed()} and then completes "this" with the returned value.
+ */
+ @Override
+ public void failed(Throwable throwable) {
+ try {
+ logger.trace("{}.{}: response failure for {}", params.getActor(), params.getOperation(),
+ params.getRequestId());
+ result.complete(doFailed(throwable));
+
+ } catch (RuntimeException e) {
+ logger.trace("{}.{}: response failure handler threw an exception for {}", params.getActor(),
+ params.getOperation(), params.getRequestId());
+ result.completeExceptionally(e);
+ }
+ }
+
+ /**
+ * Completes the processing of a response.
+ *
+ * @param rawResponse raw response that was received
+ * @return the outcome
+ */
+ protected abstract OperationOutcome doComplete(T rawResponse);
+
+ /**
+ * Handles a response exception.
+ *
+ * @param thrown exception that was thrown
+ * @return the outcome
+ */
+ protected abstract OperationOutcome doFailed(Throwable thrown);
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManager.java
new file mode 100644
index 000000000..7d7c1d902
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManager.java
@@ -0,0 +1,84 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider;
+
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Manager for "start" and "end" callbacks.
+ */
+public class CallbackManager implements Runnable {
+ private final AtomicReference<Instant> startTime = new AtomicReference<>();
+ private final AtomicReference<Instant> endTime = new AtomicReference<>();
+
+ /**
+ * Determines if the "start" callback can be invoked. If so, it sets the
+ * {@link #startTime} to the current time.
+ *
+ * @return {@code true} if the "start" callback can be invoked, {@code false}
+ * otherwise
+ */
+ public boolean canStart() {
+ return startTime.compareAndSet(null, Instant.now());
+ }
+
+ /**
+ * Determines if the "end" callback can be invoked. If so, it sets the
+ * {@link #endTime} to the current time.
+ *
+ * @return {@code true} if the "end" callback can be invoked, {@code false}
+ * otherwise
+ */
+ public boolean canEnd() {
+ return endTime.compareAndSet(null, Instant.now());
+ }
+
+ /**
+ * Gets the start time.
+ *
+ * @return the start time, or {@code null} if {@link #canStart()} has not been
+ * invoked yet.
+ */
+ public Instant getStartTime() {
+ return startTime.get();
+ }
+
+ /**
+ * Gets the end time.
+ *
+ * @return the end time, or {@code null} if {@link #canEnd()} has not been invoked
+ * yet.
+ */
+ public Instant getEndTime() {
+ return endTime.get();
+ }
+
+ /**
+ * Prevents further callbacks from being executed by setting {@link #startTime}
+ * and {@link #endTime}.
+ */
+ @Override
+ public void run() {
+ canStart();
+ canEnd();
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcome.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcome.java
new file mode 100644
index 000000000..6b0924807
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcome.java
@@ -0,0 +1,116 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider;
+
+import java.time.Instant;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import org.onap.policy.controlloop.ControlLoopOperation;
+import org.onap.policy.controlloop.policy.PolicyResult;
+
+/**
+ * Outcome from an operation. Objects of this type are passed from one stage to the next.
+ */
+@Data
+@NoArgsConstructor
+public class OperationOutcome {
+ private String actor;
+ private String operation;
+ private String target;
+ private Instant start;
+ private Instant end;
+ private String subRequestId;
+ private PolicyResult result = PolicyResult.SUCCESS;
+ private String message;
+
+ /**
+ * Copy constructor.
+ *
+ * @param source source object from which to copy
+ */
+ public OperationOutcome(OperationOutcome source) {
+ this.actor = source.actor;
+ this.operation = source.operation;
+ this.target = source.target;
+ this.start = source.start;
+ this.end = source.end;
+ this.subRequestId = source.subRequestId;
+ this.result = source.result;
+ this.message = source.message;
+ }
+
+ /**
+ * Creates a {@link ControlLoopOperation}, populating all fields with the values from
+ * this object. Sets the outcome field to the string representation of this object's
+ * outcome.
+ *
+ * @return
+ */
+ public ControlLoopOperation toControlLoopOperation() {
+ ControlLoopOperation clo = new ControlLoopOperation();
+
+ clo.setActor(actor);
+ clo.setOperation(operation);
+ clo.setTarget(target);
+ clo.setStart(start);
+ clo.setEnd(end);
+ clo.setSubRequestId(subRequestId);
+ clo.setOutcome(result.toString());
+ clo.setMessage(message);
+
+ return clo;
+ }
+
+ /**
+ * Determines if this outcome is for the given actor and operation.
+ *
+ * @param actor actor name
+ * @param operation operation name
+ * @return {@code true} if this outcome is for the given actor and operation
+ */
+ public boolean isFor(@NonNull String actor, @NonNull String operation) {
+ // do the operation check first, as it's most likely to be unique
+ return (operation.equals(this.operation) && actor.equals(this.actor));
+ }
+
+ /**
+ * Determines if an outcome is for the given actor and operation.
+ *
+ * @param outcome outcome to be examined, or {@code null}
+ * @param actor actor name
+ * @param operation operation name
+ * @return {@code true} if this outcome is for the given actor and operation,
+ * {@code false} it is {@code null} or not for the actor/operation
+ */
+ public static boolean isFor(OperationOutcome outcome, String actor, String operation) {
+ return (outcome != null && outcome.isFor(actor, operation));
+ }
+
+ /**
+ * Sets the result.
+ *
+ * @param result new result
+ */
+ public void setResult(@NonNull PolicyResult result) {
+ this.result = result;
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java
index e308ee42e..c09460e34 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java
@@ -24,7 +24,6 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.onap.policy.common.capabilities.Configurable;
import org.onap.policy.common.capabilities.Startable;
-import org.onap.policy.controlloop.ControlLoopOperation;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
/**
@@ -54,5 +53,5 @@ public interface Operator extends Startable, Configurable<Map<String, Object>> {
* @param params parameters needed to start the operation
* @return a future that can be used to cancel or await the result of the operation
*/
- CompletableFuture<ControlLoopOperation> startOperation(ControlLoopOperationParams params);
+ CompletableFuture<OperationOutcome> startOperation(ControlLoopOperationParams params);
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java
index 0aba1a7fa..c3ddd17f3 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java
@@ -23,6 +23,9 @@ package org.onap.policy.controlloop.actorserviceprovider;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
import org.onap.policy.common.utils.coder.Coder;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.common.utils.coder.StandardCoder;
@@ -53,6 +56,82 @@ public class Util {
}
/**
+ * Logs a REST request. If the request is not of type, String, then it attempts to
+ * pretty-print it into JSON before logging.
+ *
+ * @param url request URL
+ * @param request request to be logged
+ */
+ public static <T> void logRestRequest(String url, T request) {
+ logRestRequest(new StandardCoder(), url, request);
+ }
+
+ /**
+ * Logs a REST request. If the request is not of type, String, then it attempts to
+ * pretty-print it into JSON before logging.
+ *
+ * @param coder coder to be used to pretty-print the request
+ * @param url request URL
+ * @param request request to be logged
+ */
+ protected static <T> void logRestRequest(Coder coder, String url, T request) {
+ String json;
+ try {
+ if (request instanceof String) {
+ json = request.toString();
+ } else {
+ json = coder.encode(request, true);
+ }
+
+ } catch (CoderException e) {
+ logger.warn("cannot pretty-print request", e);
+ json = request.toString();
+ }
+
+ NetLoggerUtil.log(EventType.OUT, CommInfrastructure.REST, url, json);
+ logger.info("[OUT|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json);
+ }
+
+ /**
+ * Logs a REST response. If the response is not of type, String, then it attempts to
+ * pretty-print it into JSON before logging.
+ *
+ * @param url request URL
+ * @param response response to be logged
+ */
+ public static <T> void logRestResponse(String url, T response) {
+ logRestResponse(new StandardCoder(), url, response);
+ }
+
+ /**
+ * Logs a REST response. If the request is not of type, String, then it attempts to
+ * pretty-print it into JSON before logging.
+ *
+ * @param coder coder to be used to pretty-print the response
+ * @param url request URL
+ * @param response response to be logged
+ */
+ protected static <T> void logRestResponse(Coder coder, String url, T response) {
+ String json;
+ try {
+ if (response == null) {
+ json = null;
+ } else if (response instanceof String) {
+ json = response.toString();
+ } else {
+ json = coder.encode(response, true);
+ }
+
+ } catch (CoderException e) {
+ logger.warn("cannot pretty-print response", e);
+ json = response.toString();
+ }
+
+ NetLoggerUtil.log(EventType.IN, CommInfrastructure.REST, url, json);
+ logger.info("[IN|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json);
+ }
+
+ /**
* Runs a function and logs a message if it throws an exception. Does <i>not</i>
* re-throw the exception.
*
@@ -60,7 +139,7 @@ public class Util {
* @param exceptionMessage message to log if an exception is thrown
* @param exceptionArgs arguments to be passed to the logger
*/
- public static void logException(Runnable function, String exceptionMessage, Object... exceptionArgs) {
+ public static void runFunction(Runnable function, String exceptionMessage, Object... exceptionArgs) {
try {
function.run();
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java
index 68bbe7edc..cd4d2570f 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java
@@ -22,11 +22,12 @@ package org.onap.policy.controlloop.actorserviceprovider.controlloop;
import java.io.Serializable;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import lombok.AccessLevel;
import lombok.Getter;
+import lombok.NonNull;
import lombok.Setter;
-import org.onap.policy.aai.AaiCqResponse;
import org.onap.policy.controlloop.VirtualControlLoopEvent;
/**
@@ -37,23 +38,35 @@ import org.onap.policy.controlloop.VirtualControlLoopEvent;
public class ControlLoopEventContext implements Serializable {
private static final long serialVersionUID = 1L;
- @Setter(AccessLevel.NONE)
+
private final VirtualControlLoopEvent event;
- private AaiCqResponse aaiCqResponse;
+ /**
+ * Enrichment data extracted from the event. Never {@code null}, though it may be
+ * immutable.
+ */
+ private final Map<String, String> enrichment;
- // TODO may remove this if it proves not to be needed
@Getter(AccessLevel.NONE)
@Setter(AccessLevel.NONE)
private Map<String, Serializable> properties = new ConcurrentHashMap<>();
/**
+ * Request ID extracted from the event, or a generated value if the event has no
+ * request id; never {@code null}.
+ */
+ private final UUID requestId;
+
+
+ /**
* Constructs the object.
*
* @param event event with which this is associated
*/
- public ControlLoopEventContext(VirtualControlLoopEvent event) {
+ public ControlLoopEventContext(@NonNull VirtualControlLoopEvent event) {
this.event = event;
+ this.requestId = (event.getRequestId() != null ? event.getRequestId() : UUID.randomUUID());
+ this.enrichment = (event.getAai() != null ? event.getAai() : Map.of());
}
/**
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java
index 9b9aa914e..d7f322e8a 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java
@@ -20,14 +20,12 @@
package org.onap.policy.controlloop.actorserviceprovider.impl;
-import com.google.common.collect.ImmutableMap;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.onap.policy.common.parameters.BeanValidationResult;
import org.onap.policy.controlloop.actorserviceprovider.Operator;
@@ -46,44 +44,42 @@ public class ActorImpl extends StartConfigPartial<Map<String, Object>> implement
/**
* Maps a name to an operator.
*/
- private Map<String, Operator> name2operator;
+ private final Map<String, Operator> name2operator = new ConcurrentHashMap<>();
/**
* Constructs the object.
*
* @param name actor name
- * @param operators the operations supported by this actor
*/
- public ActorImpl(String name, Operator... operators) {
+ public ActorImpl(String name) {
super(name);
- setOperators(Arrays.asList(operators));
}
/**
- * Sets the operators supported by this actor, overriding any previous list.
+ * Adds an operator supported by this actor.
*
- * @param operators the operations supported by this actor
+ * @param operator operation to be added
*/
- protected void setOperators(List<Operator> operators) {
+ protected synchronized void addOperator(Operator operator) {
+ /*
+ * This method is "synchronized" to prevent the state from changing while the
+ * operator is added. The map, itself, does not need synchronization as it's a
+ * concurrent map.
+ */
+
if (isConfigured()) {
throw new IllegalStateException("attempt to set operators on a configured actor: " + getName());
}
- Map<String, Operator> map = new HashMap<>();
- for (Operator newOp : operators) {
- map.compute(newOp.getName(), (opName, existingOp) -> {
- if (existingOp == null) {
- return newOp;
- }
-
- // TODO: should this throw an exception?
- logger.warn("duplicate names for actor operation {}.{}: {}, ignoring {}", getName(), opName,
- existingOp.getClass().getSimpleName(), newOp.getClass().getSimpleName());
- return existingOp;
- });
- }
+ name2operator.compute(operator.getName(), (opName, existingOp) -> {
+ if (existingOp == null) {
+ return operator;
+ }
- this.name2operator = ImmutableMap.copyOf(map);
+ logger.warn("duplicate names for actor operation {}.{}: {}, ignoring {}", getName(), opName,
+ existingOp.getClass().getSimpleName(), operator.getClass().getSimpleName());
+ return existingOp;
+ });
}
@Override
@@ -177,7 +173,7 @@ public class ActorImpl extends StartConfigPartial<Map<String, Object>> implement
for (Operator oper : name2operator.values()) {
if (oper.isConfigured()) {
- Util.logException(oper::start, "failed to start operation {}.{}", actorName, oper.getName());
+ Util.runFunction(oper::start, "failed to start operation {}.{}", actorName, oper.getName());
} else {
logger.warn("not starting unconfigured operation {}.{}", actorName, oper.getName());
@@ -195,7 +191,7 @@ public class ActorImpl extends StartConfigPartial<Map<String, Object>> implement
// @formatter:off
name2operator.values().forEach(
- oper -> Util.logException(oper::stop, "failed to stop operation {}.{}", actorName, oper.getName()));
+ oper -> Util.runFunction(oper::stop, "failed to stop operation {}.{}", actorName, oper.getName()));
// @formatter:on
}
@@ -208,7 +204,7 @@ public class ActorImpl extends StartConfigPartial<Map<String, Object>> implement
logger.info("shutting down operations for actor {}", actorName);
// @formatter:off
- name2operator.values().forEach(oper -> Util.logException(oper::shutdown,
+ name2operator.values().forEach(oper -> Util.runFunction(oper::shutdown,
"failed to shutdown operation {}.{}", actorName, oper.getName()));
// @formatter:on
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActor.java
new file mode 100644
index 000000000..28b7b3924
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActor.java
@@ -0,0 +1,58 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import java.util.Map;
+import java.util.function.Function;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpActorParams;
+
+/**
+ * Actor that uses HTTP, where the only additional property that an operator needs is a
+ * URL. The actor's parameters must be an {@link HttpActorParams} and its operator
+ * parameters are expected to be an {@link HttpParams}.
+ */
+public class HttpActor extends ActorImpl {
+
+ /**
+ * Constructs the object.
+ *
+ * @param name actor's name
+ */
+ public HttpActor(String name) {
+ super(name);
+ }
+
+ /**
+ * Translates the parameters to an {@link HttpActorParams} and then creates a function
+ * that will extract operator-specific parameters.
+ */
+ @Override
+ protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) {
+ String actorName = getName();
+
+ // @formatter:off
+ return Util.translate(actorName, actorParameters, HttpActorParams.class)
+ .doValidation(actorName)
+ .makeOperationParameters(actorName);
+ // @formatter:on
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java
new file mode 100644
index 000000000..566492907
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java
@@ -0,0 +1,84 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import java.util.Map;
+import lombok.AccessLevel;
+import lombok.Getter;
+import org.onap.policy.common.endpoints.http.client.HttpClient;
+import org.onap.policy.common.endpoints.http.client.HttpClientFactory;
+import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
+
+/**
+ * Operator that uses HTTP. The operator's parameters must be a {@link HttpParams}.
+ */
+public class HttpOperator extends OperatorPartial {
+
+ @Getter(AccessLevel.PROTECTED)
+ private HttpClient client;
+
+ @Getter
+ private long timeoutSec;
+
+ /**
+ * URI path for this particular operation.
+ */
+ @Getter
+ private String path;
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param actorName name of the actor with which this operator is associated
+ * @param name operation name
+ */
+ public HttpOperator(String actorName, String name) {
+ super(actorName, name);
+ }
+
+ /**
+ * Translates the parameters to an {@link HttpParams} and then extracts the relevant
+ * values.
+ */
+ @Override
+ protected void doConfigure(Map<String, Object> parameters) {
+ HttpParams params = Util.translate(getFullName(), parameters, HttpParams.class);
+ ValidationResult result = params.validate(getFullName());
+ if (!result.isValid()) {
+ throw new ParameterValidationRuntimeException("invalid parameters", result);
+ }
+
+ client = getClientFactory().get(params.getClientName());
+ path = params.getPath();
+ timeoutSec = params.getTimeoutSec();
+ }
+
+ // these may be overridden by junits
+
+ protected HttpClientFactory getClientFactory() {
+ return HttpClientFactoryInstance.getClientFactory();
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java
index 80d8fbd04..df5258d71 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java
@@ -20,37 +20,61 @@
package org.onap.policy.controlloop.actorserviceprovider.impl;
-import java.time.Instant;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
import java.util.function.Function;
+import lombok.AccessLevel;
import lombok.Getter;
+import lombok.Setter;
import org.onap.policy.controlloop.ControlLoopOperation;
+import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
import org.onap.policy.controlloop.actorserviceprovider.Operator;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
-import org.onap.policy.controlloop.policy.Policy;
import org.onap.policy.controlloop.policy.PolicyResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Partial implementation of an operator. Subclasses can choose to simply implement
- * {@link #doOperation(ControlLoopOperationParams)}, or they may choose to override
- * {@link #doOperationAsFuture(ControlLoopOperationParams)}.
+ * Partial implementation of an operator. In general, it's preferable that subclasses
+ * would override
+ * {@link #startOperationAsync(ControlLoopOperationParams, int, OperationOutcome)
+ * startOperationAsync()}. However, if that proves to be too difficult, then they can
+ * simply override {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome)
+ * doOperation()}. In addition, if the operation requires any preprocessor steps, the
+ * subclass may choose to override
+ * {@link #startPreprocessorAsync(ControlLoopOperationParams) startPreprocessorAsync()}.
+ * <p/>
+ * The futures returned by the methods within this class can be canceled, and will
+ * propagate the cancellation to any subtasks. Thus it is also expected that any futures
+ * returned by overridden methods will do the same. Of course, if a class overrides
+ * {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome) doOperation()},
+ * then there's little that can be done to cancel that particular operation.
*/
public abstract class OperatorPartial extends StartConfigPartial<Map<String, Object>> implements Operator {
private static final Logger logger = LoggerFactory.getLogger(OperatorPartial.class);
- private static final String OUTCOME_SUCCESS = PolicyResult.SUCCESS.toString();
- private static final String OUTCOME_FAILURE = PolicyResult.FAILURE.toString();
- private static final String OUTCOME_RETRIES = PolicyResult.FAILURE_RETRIES.toString();
+ /**
+ * Executor to be used for tasks that may perform blocking I/O. The default executor
+ * simply launches a new thread for each command that is submitted to it.
+ * <p/>
+ * May be overridden by junit tests.
+ */
+ @Getter(AccessLevel.PROTECTED)
+ @Setter(AccessLevel.PROTECTED)
+ private Executor blockingExecutor = command -> {
+ Thread thread = new Thread(command);
+ thread.setDaemon(true);
+ thread.start();
+ };
@Getter
private final String actorName;
@@ -103,94 +127,52 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
}
@Override
- public final CompletableFuture<ControlLoopOperation> startOperation(ControlLoopOperationParams params) {
+ public final CompletableFuture<OperationOutcome> startOperation(ControlLoopOperationParams params) {
if (!isAlive()) {
throw new IllegalStateException("operation is not running: " + getFullName());
}
- final Executor executor = params.getExecutor();
-
// allocate a controller for the entire operation
- final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
+ final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
- CompletableFuture<ControlLoopOperation> preproc = startPreprocessor(params);
+ CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync(params);
if (preproc == null) {
// no preprocessor required - just start the operation
return startOperationAttempt(params, controller, 1);
}
- // propagate "stop" to the preprocessor
- controller.add(preproc);
-
/*
* Do preprocessor first and then, if successful, start the operation. Note:
* operations create their own outcome, ignoring the outcome from any previous
* steps.
+ *
+ * Wrap the preprocessor to ensure "stop" is propagated to it.
*/
- preproc.whenCompleteAsync(controller.delayedRemove(preproc), executor)
- .thenComposeAsync(handleFailure(params, controller), executor)
- .thenComposeAsync(onSuccess(params, unused -> startOperationAttempt(params, controller, 1)),
- executor);
-
- return controller;
- }
-
- /**
- * Starts an operation's preprocessor step(s). If the preprocessor fails, then it
- * invokes the started and completed call-backs.
- *
- * @param params operation parameters
- * @return a future that will return the preprocessor outcome, or {@code null} if this
- * operation needs no preprocessor
- */
- protected CompletableFuture<ControlLoopOperation> startPreprocessor(ControlLoopOperationParams params) {
- logger.info("{}: start low-level operation preprocessor for {}", getFullName(), params.getRequestId());
-
- final Executor executor = params.getExecutor();
- final ControlLoopOperation operation = params.makeOutcome();
-
- final Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preproc =
- doPreprocessorAsFuture(params);
- if (preproc == null) {
- // no preprocessor required
- return null;
- }
-
- // allocate a controller for the preprocessor steps
- final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
-
- /*
- * Don't mark it complete until we've built the whole pipeline. This will prevent
- * the operation from starting until after it has been successfully built (i.e.,
- * without generating any exceptions).
- */
- final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>();
-
// @formatter:off
- firstFuture
- .thenComposeAsync(controller.add(preproc), executor)
- .exceptionally(fromException(params, operation))
- .whenCompleteAsync(controller.delayedComplete(), executor);
+ controller.wrap(preproc)
+ .exceptionally(fromException(params, "preprocessor of operation"))
+ .thenCompose(handlePreprocessorFailure(params, controller))
+ .thenCompose(unusedOutcome -> startOperationAttempt(params, controller, 1));
// @formatter:on
- // start the pipeline
- firstFuture.complete(operation);
-
return controller;
}
/**
* Handles a failure in the preprocessor pipeline. If a failure occurred, then it
- * invokes the call-backs and returns a failed outcome. Otherwise, it returns the
- * outcome that it received.
+ * invokes the call-backs, marks the controller complete, and returns an incomplete
+ * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
+ * received.
+ * <p/>
+ * Assumes that no callbacks have been invoked yet.
*
* @param params operation parameters
* @param controller pipeline controller
* @return a function that checks the outcome status and continues, if successful, or
* indicates a failure otherwise
*/
- private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> handleFailure(
- ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller) {
+ private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
+ ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller) {
return outcome -> {
@@ -207,19 +189,21 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
// propagate "stop" to the callbacks
controller.add(callbacks);
- final ControlLoopOperation outcome2 = params.makeOutcome();
+ final OperationOutcome outcome2 = params.makeOutcome();
// TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
- outcome2.setOutcome(PolicyResult.FAILURE_GUARD.toString());
+ outcome2.setResult(PolicyResult.FAILURE_GUARD);
outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
- CompletableFuture.completedFuture(outcome2).thenApplyAsync(callbackStarted(params, callbacks), executor)
- .thenApplyAsync(callbackCompleted(params, callbacks), executor)
- .whenCompleteAsync(controller.delayedRemove(callbacks), executor)
+ // @formatter:off
+ CompletableFuture.completedFuture(outcome2)
+ .whenCompleteAsync(callbackStarted(params, callbacks), executor)
+ .whenCompleteAsync(callbackCompleted(params, callbacks), executor)
.whenCompleteAsync(controller.delayedComplete(), executor);
+ // @formatter:on
- return controller;
+ return new CompletableFuture<>();
};
}
@@ -237,8 +221,7 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @return a function that will start the preprocessor and returns its outcome, or
* {@code null} if this operation needs no preprocessor
*/
- protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture(
- ControlLoopOperationParams params) {
+ protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) {
return null;
}
@@ -251,20 +234,12 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @param attempt attempt number, typically starting with 1
* @return a future that will return the final result of all attempts
*/
- private CompletableFuture<ControlLoopOperation> startOperationAttempt(ControlLoopOperationParams params,
- PipelineControllerFuture<ControlLoopOperation> controller, int attempt) {
-
- final Executor executor = params.getExecutor();
-
- CompletableFuture<ControlLoopOperation> future = startAttemptWithoutRetries(params, attempt);
+ private CompletableFuture<OperationOutcome> startOperationAttempt(ControlLoopOperationParams params,
+ PipelineControllerFuture<OperationOutcome> controller, int attempt) {
// propagate "stop" to the operation attempt
- controller.add(future);
-
- // detach when complete
- future.whenCompleteAsync(controller.delayedRemove(future), executor)
- .thenComposeAsync(retryOnFailure(params, controller, attempt), params.getExecutor())
- .whenCompleteAsync(controller.delayedComplete(), executor);
+ controller.wrap(startAttemptWithoutRetries(params, attempt))
+ .thenCompose(retryOnFailure(params, controller, attempt));
return controller;
}
@@ -276,40 +251,32 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @param attempt attempt number, typically starting with 1
* @return a future that will return the result of a single operation attempt
*/
- private CompletableFuture<ControlLoopOperation> startAttemptWithoutRetries(ControlLoopOperationParams params,
+ private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(ControlLoopOperationParams params,
int attempt) {
logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
final Executor executor = params.getExecutor();
- final ControlLoopOperation outcome = params.makeOutcome();
+ final OperationOutcome outcome = params.makeOutcome();
final CallbackManager callbacks = new CallbackManager();
// this operation attempt gets its own controller
- final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
+ final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
// propagate "stop" to the callbacks
controller.add(callbacks);
- /*
- * Don't mark it complete until we've built the whole pipeline. This will prevent
- * the operation from starting until after it has been successfully built (i.e.,
- * without generating any exceptions).
- */
- final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>();
-
// @formatter:off
- CompletableFuture<ControlLoopOperation> future2 =
- firstFuture.thenComposeAsync(verifyRunning(controller, params), executor)
- .thenApplyAsync(callbackStarted(params, callbacks), executor)
- .thenComposeAsync(controller.add(doOperationAsFuture(params, attempt)), executor);
+ CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
+ .whenCompleteAsync(callbackStarted(params, callbacks), executor)
+ .thenCompose(controller.wrap(outcome2 -> startOperationAsync(params, attempt, outcome2)));
// @formatter:on
// handle timeouts, if specified
- long timeoutMillis = getTimeOutMillis(params.getPolicy());
+ long timeoutMillis = getTimeOutMillis(params.getTimeoutSec());
if (timeoutMillis > 0) {
logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
- future2 = future2.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
+ future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
}
/*
@@ -321,16 +288,13 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
*/
// @formatter:off
- future2.exceptionally(fromException(params, outcome))
- .thenApplyAsync(setRetryFlag(params, attempt), executor)
- .thenApplyAsync(callbackStarted(params, callbacks), executor)
- .thenApplyAsync(callbackCompleted(params, callbacks), executor)
+ future.exceptionally(fromException(params, "operation"))
+ .thenApply(setRetryFlag(params, attempt))
+ .whenCompleteAsync(callbackStarted(params, callbacks), executor)
+ .whenCompleteAsync(callbackCompleted(params, callbacks), executor)
.whenCompleteAsync(controller.delayedComplete(), executor);
// @formatter:on
- // start the pipeline
- firstFuture.complete(outcome);
-
return controller;
}
@@ -340,8 +304,8 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @param outcome outcome to examine
* @return {@code true} if the outcome was successful
*/
- protected boolean isSuccess(ControlLoopOperation outcome) {
- return OUTCOME_SUCCESS.equals(outcome.getOutcome());
+ protected boolean isSuccess(OperationOutcome outcome) {
+ return (outcome.getResult() == PolicyResult.SUCCESS);
}
/**
@@ -351,13 +315,29 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @return {@code true} if the outcome is not {@code null} and was a failure
* <i>and</i> was associated with this operator, {@code false} otherwise
*/
- protected boolean isActorFailed(ControlLoopOperation outcome) {
- return OUTCOME_FAILURE.equals(getActorOutcome(outcome));
+ protected boolean isActorFailed(OperationOutcome outcome) {
+ return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
+ }
+
+ /**
+ * Determines if the given outcome is for this operation.
+ *
+ * @param outcome outcome to examine
+ * @return {@code true} if the outcome is for this operation, {@code false} otherwise
+ */
+ protected boolean isSameOperation(OperationOutcome outcome) {
+ return OperationOutcome.isFor(outcome, getActorName(), getName());
}
/**
* Invokes the operation as a "future". This method simply invokes
- * {@link #doOperation(ControlLoopOperationParams)} turning it into a "future".
+ * {@link #doOperation(ControlLoopOperationParams)} using the {@link #blockingExecutor
+ * "blocking executor"}, returning the result via a "future".
+ * <p/>
+ * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
+ * the executor in the "params", as that may bring the background thread pool to a
+ * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
+ * instead.
* <p/>
* This method assumes the following:
* <ul>
@@ -373,31 +353,23 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @return a function that will start the operation and return its result when
* complete
*/
- protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture(
- ControlLoopOperationParams params, int attempt) {
-
- /*
- * TODO As doOperation() may perform blocking I/O, this should be launched in its
- * own thread to prevent the ForkJoinPool from being tied up. Should probably
- * provide a method to make that easy.
- */
+ protected CompletableFuture<OperationOutcome> startOperationAsync(ControlLoopOperationParams params, int attempt,
+ OperationOutcome outcome) {
- return operation -> CompletableFuture.supplyAsync(() -> doOperation(params, attempt, operation),
- params.getExecutor());
+ return CompletableFuture.supplyAsync(() -> doOperation(params, attempt, outcome), getBlockingExecutor());
}
/**
* Low-level method that performs the operation. This can make the same assumptions
* that are made by {@link #doOperationAsFuture(ControlLoopOperationParams)}. This
- * method throws an {@link UnsupportedOperationException}.
+ * particular method simply throws an {@link UnsupportedOperationException}.
*
* @param params operation parameters
* @param attempt attempt number, typically starting with 1
* @param operation the operation being performed
* @return the outcome of the operation
*/
- protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt,
- ControlLoopOperation operation) {
+ protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt, OperationOutcome operation) {
throw new UnsupportedOperationException("start operation " + getFullName());
}
@@ -411,8 +383,7 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @param attempt latest attempt number, starting with 1
* @return a function to get the next future to execute
*/
- private Function<ControlLoopOperation, ControlLoopOperation> setRetryFlag(ControlLoopOperationParams params,
- int attempt) {
+ private Function<OperationOutcome, OperationOutcome> setRetryFlag(ControlLoopOperationParams params, int attempt) {
return operation -> {
if (operation != null && !isActorFailed(operation)) {
@@ -424,22 +395,22 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
}
// get a non-null operation
- ControlLoopOperation oper2;
+ OperationOutcome oper2;
if (operation != null) {
oper2 = operation;
} else {
oper2 = params.makeOutcome();
- oper2.setOutcome(OUTCOME_FAILURE);
+ oper2.setResult(PolicyResult.FAILURE);
}
- if (params.getPolicy().getRetry() != null && params.getPolicy().getRetry() > 0
- && attempt > params.getPolicy().getRetry()) {
+ Integer retry = params.getRetry();
+ if (retry != null && retry > 0 && attempt > retry) {
/*
* retries were specified and we've already tried them all - change to
* FAILURE_RETRIES
*/
logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
- oper2.setOutcome(OUTCOME_RETRIES);
+ oper2.setResult(PolicyResult.FAILURE_RETRIES);
}
return oper2;
@@ -456,21 +427,24 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @param attempt latest attempt number, starting with 1
* @return a function to get the next future to execute
*/
- private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> retryOnFailure(
- ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller,
+ private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
+ ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller,
int attempt) {
return operation -> {
if (!isActorFailed(operation)) {
// wrong type or wrong operation - just leave it as is
logger.trace("not retrying operation {} for {}", getFullName(), params.getRequestId());
- return CompletableFuture.completedFuture(operation);
+ controller.complete(operation);
+ return new CompletableFuture<>();
}
- if (params.getPolicy().getRetry() == null || params.getPolicy().getRetry() <= 0) {
+ Integer retry = params.getRetry();
+ if (retry == null || retry <= 0) {
// no retries - already marked as FAILURE, so just return it
logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
- return CompletableFuture.completedFuture(operation);
+ controller.complete(operation);
+ return new CompletableFuture<>();
}
@@ -484,100 +458,279 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
}
/**
- * Gets the outcome of an operation for this operation.
+ * Converts an exception into an operation outcome, returning a copy of the outcome to
+ * prevent background jobs from changing it.
*
- * @param operation operation whose outcome is to be extracted
- * @return the outcome of the given operation, if it's for this operator, {@code null}
- * otherwise
+ * @param params operation parameters
+ * @param type type of item throwing the exception
+ * @return a function that will convert an exception into an operation outcome
*/
- protected String getActorOutcome(ControlLoopOperation operation) {
- if (operation == null) {
- return null;
- }
+ private Function<Throwable, OperationOutcome> fromException(ControlLoopOperationParams params, String type) {
- if (!getActorName().equals(operation.getActor())) {
- return null;
- }
+ return thrown -> {
+ OperationOutcome outcome = params.makeOutcome();
+
+ logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
+ params.getRequestId(), thrown);
+
+ return setOutcome(params, outcome, thrown);
+ };
+ }
+
+ /**
+ * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
+ * any outstanding futures when one completes.
+ *
+ * @param params operation parameters
+ * @param futures futures for which to wait
+ * @return a future to cancel or await an outcome. If this future is canceled, then
+ * all of the futures will be canceled
+ */
+ protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params,
+ List<CompletableFuture<OperationOutcome>> futures) {
+
+ // convert list to an array
+ @SuppressWarnings("rawtypes")
+ CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
+
+ @SuppressWarnings("unchecked")
+ CompletableFuture<OperationOutcome> result = anyOf(params, arrFutures);
+ return result;
+ }
+
+ /**
+ * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any
+ * outstanding futures when one completes.
+ *
+ * @param params operation parameters
+ * @param futures futures for which to wait
+ * @return a future to cancel or await an outcome. If this future is canceled, then
+ * all of the futures will be canceled
+ */
+ protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params,
+ @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+
+ final Executor executor = params.getExecutor();
+ final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+
+ attachFutures(controller, futures);
+
+ // @formatter:off
+ CompletableFuture.anyOf(futures)
+ .thenApply(object -> (OperationOutcome) object)
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+ // @formatter:on
+
+ return controller;
+ }
+
+ /**
+ * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels
+ * the futures if returned future is canceled. The future returns the "worst" outcome,
+ * based on priority (see {@link #detmPriority(OperationOutcome)}).
+ *
+ * @param params operation parameters
+ * @param futures futures for which to wait
+ * @return a future to cancel or await an outcome. If this future is canceled, then
+ * all of the futures will be canceled
+ */
+ protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params,
+ List<CompletableFuture<OperationOutcome>> futures) {
- if (!getName().equals(operation.getOperation())) {
- return null;
+ // convert list to an array
+ @SuppressWarnings("rawtypes")
+ CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
+
+ @SuppressWarnings("unchecked")
+ CompletableFuture<OperationOutcome> result = allOf(params, arrFutures);
+ return result;
+ }
+
+ /**
+ * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the
+ * futures if returned future is canceled. The future returns the "worst" outcome,
+ * based on priority (see {@link #detmPriority(OperationOutcome)}).
+ *
+ * @param params operation parameters
+ * @param futures futures for which to wait
+ * @return a future to cancel or await an outcome. If this future is canceled, then
+ * all of the futures will be canceled
+ */
+ protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params,
+ @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+
+ final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+
+ attachFutures(controller, futures);
+
+ OperationOutcome[] outcomes = new OperationOutcome[futures.length];
+
+ @SuppressWarnings("rawtypes")
+ CompletableFuture[] futures2 = new CompletableFuture[futures.length];
+
+ // record the outcomes of each future when it completes
+ for (int count = 0; count < futures2.length; ++count) {
+ final int count2 = count;
+ futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2);
}
- return operation.getOutcome();
+ CompletableFuture.allOf(futures2).whenComplete(combineOutcomes(params, controller, outcomes));
+
+ return controller;
}
/**
- * Gets a function that will start the next step, if the current operation was
- * successful, or just return the current operation, otherwise.
+ * Attaches the given futures to the controller.
+ *
+ * @param controller master controller for all of the futures
+ * @param futures futures to be attached to the controller
+ */
+ private void attachFutures(PipelineControllerFuture<OperationOutcome> controller,
+ @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+
+ // attach each task
+ for (CompletableFuture<OperationOutcome> future : futures) {
+ controller.add(future);
+ }
+ }
+
+ /**
+ * Combines the outcomes from a set of tasks.
*
* @param params operation parameters
- * @param nextStep function that will invoke the next step, passing it the operation
- * @return a function that will start the next step
+ * @param future future to be completed with the combined result
+ * @param outcomes outcomes to be examined
*/
- protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> onSuccess(
- ControlLoopOperationParams params,
- Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> nextStep) {
+ private BiConsumer<Void, Throwable> combineOutcomes(ControlLoopOperationParams params,
+ CompletableFuture<OperationOutcome> future, OperationOutcome[] outcomes) {
- return operation -> {
+ return (unused, thrown) -> {
+ if (thrown != null) {
+ future.completeExceptionally(thrown);
+ return;
+ }
- if (operation == null) {
- logger.trace("{}: null outcome - discarding next task for {}", getFullName(), params.getRequestId());
- ControlLoopOperation outcome = params.makeOutcome();
- outcome.setOutcome(OUTCOME_FAILURE);
- return CompletableFuture.completedFuture(outcome);
+ // identify the outcome with the highest priority
+ OperationOutcome outcome = outcomes[0];
+ int priority = detmPriority(outcome);
- } else if (isSuccess(operation)) {
- logger.trace("{}: success - starting next task for {}", getFullName(), params.getRequestId());
- return nextStep.apply(operation);
+ // start with "1", as we've already dealt with "0"
+ for (int count = 1; count < outcomes.length; ++count) {
+ OperationOutcome outcome2 = outcomes[count];
+ int priority2 = detmPriority(outcome2);
- } else {
- logger.trace("{}: failure - discarding next task for {}", getFullName(), params.getRequestId());
- return CompletableFuture.completedFuture(operation);
+ if (priority2 > priority) {
+ outcome = outcome2;
+ priority = priority2;
+ }
}
+
+ logger.trace("{}: combined outcome of tasks is {} for {}", getFullName(),
+ (outcome == null ? null : outcome.getResult()), params.getRequestId());
+
+ future.complete(outcome);
};
}
/**
- * Converts an exception into an operation outcome, returning a copy of the outcome to
- * prevent background jobs from changing it.
+ * Determines the priority of an outcome based on its result.
*
- * @param params operation parameters
- * @param operation current operation
- * @return a function that will convert an exception into an operation outcome
+ * @param outcome outcome to examine, or {@code null}
+ * @return the outcome's priority
*/
- private Function<Throwable, ControlLoopOperation> fromException(ControlLoopOperationParams params,
- ControlLoopOperation operation) {
+ protected int detmPriority(OperationOutcome outcome) {
+ if (outcome == null) {
+ return 1;
+ }
- return thrown -> {
- logger.warn("exception throw by operation {}.{} for {}", operation.getActor(), operation.getOperation(),
- params.getRequestId(), thrown);
+ switch (outcome.getResult()) {
+ case SUCCESS:
+ return 0;
+
+ case FAILURE_GUARD:
+ return 2;
+
+ case FAILURE_RETRIES:
+ return 3;
+
+ case FAILURE:
+ return 4;
+
+ case FAILURE_TIMEOUT:
+ return 5;
+
+ case FAILURE_EXCEPTION:
+ default:
+ return 6;
+ }
+ }
+
+ /**
+ * Performs a task, after verifying that the controller is still running. Also checks
+ * that the previous outcome was successful, if specified.
+ *
+ * @param params operation parameters
+ * @param controller overall pipeline controller
+ * @param checkSuccess {@code true} to check the previous outcome, {@code false}
+ * otherwise
+ * @param outcome outcome of the previous task
+ * @param tasks tasks to be performed
+ * @return a function to perform the task. If everything checks out, then it returns
+ * the task's future. Otherwise, it returns an incomplete future and completes
+ * the controller instead.
+ */
+ // @formatter:off
+ protected CompletableFuture<OperationOutcome> doTask(ControlLoopOperationParams params,
+ PipelineControllerFuture<OperationOutcome> controller,
+ boolean checkSuccess, OperationOutcome outcome,
+ CompletableFuture<OperationOutcome> task) {
+ // @formatter:on
+ if (checkSuccess && !isSuccess(outcome)) {
/*
- * Must make a copy of the operation, as the original could be changed by
- * background jobs that might still be running.
+ * must complete before canceling so that cancel() doesn't cause controller to
+ * complete
*/
- return setOutcome(params, new ControlLoopOperation(operation), thrown);
- };
+ controller.complete(outcome);
+ task.cancel(false);
+ return new CompletableFuture<>();
+ }
+
+ return controller.wrap(task);
}
/**
- * Gets a function to verify that the operation is still running. If the pipeline is
- * not running, then it returns an incomplete future, which will effectively halt
- * subsequent operations in the pipeline. This method is intended to be used with one
- * of the {@link CompletableFuture}'s <i>thenCompose()</i> methods.
+ * Performs a task, after verifying that the controller is still running. Also checks
+ * that the previous outcome was successful, if specified.
*
- * @param controller pipeline controller
* @param params operation parameters
- * @return a function to verify that the operation is still running
+ * @param controller overall pipeline controller
+ * @param checkSuccess {@code true} to check the previous outcome, {@code false}
+ * otherwise
+ * @param tasks tasks to be performed
+ * @return a function to perform the task. If everything checks out, then it returns
+ * the task's future. Otherwise, it returns an incomplete future and completes
+ * the controller instead.
*/
- protected <T> Function<T, CompletableFuture<T>> verifyRunning(
- PipelineControllerFuture<ControlLoopOperation> controller, ControlLoopOperationParams params) {
+ // @formatter:off
+ protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask(ControlLoopOperationParams params,
+ PipelineControllerFuture<OperationOutcome> controller,
+ boolean checkSuccess,
+ Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) {
+ // @formatter:on
+
+ return outcome -> {
+
+ if (!controller.isRunning()) {
+ return new CompletableFuture<>();
+ }
- return value -> {
- boolean running = controller.isRunning();
- logger.trace("{}: verify running {} for {}", getFullName(), running, params.getRequestId());
+ if (checkSuccess && !isSuccess(outcome)) {
+ controller.complete(outcome);
+ return new CompletableFuture<>();
+ }
- return (running ? CompletableFuture.completedFuture(value) : new CompletableFuture<>());
+ return controller.wrap(task.apply(outcome));
};
}
@@ -591,10 +744,10 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @param callbacks used to determine if the start callback can be invoked
* @return a function that sets the start time and invokes the callback
*/
- private Function<ControlLoopOperation, ControlLoopOperation> callbackStarted(ControlLoopOperationParams params,
+ private BiConsumer<OperationOutcome, Throwable> callbackStarted(ControlLoopOperationParams params,
CallbackManager callbacks) {
- return outcome -> {
+ return (outcome, thrown) -> {
if (callbacks.canStart()) {
// haven't invoked "start" callback yet
@@ -602,8 +755,6 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
outcome.setEnd(null);
params.callbackStarted(outcome);
}
-
- return outcome;
};
}
@@ -621,18 +772,16 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @param callbacks used to determine if the end callback can be invoked
* @return a function that sets the end time and invokes the callback
*/
- private Function<ControlLoopOperation, ControlLoopOperation> callbackCompleted(ControlLoopOperationParams params,
+ private BiConsumer<OperationOutcome, Throwable> callbackCompleted(ControlLoopOperationParams params,
CallbackManager callbacks) {
- return operation -> {
+ return (outcome, thrown) -> {
if (callbacks.canEnd()) {
- operation.setStart(callbacks.getStartTime());
- operation.setEnd(callbacks.getEndTime());
- params.callbackCompleted(operation);
+ outcome.setStart(callbacks.getStartTime());
+ outcome.setEnd(callbacks.getEndTime());
+ params.callbackCompleted(outcome);
}
-
- return operation;
};
}
@@ -643,7 +792,7 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @param operation operation to be updated
* @return the updated operation
*/
- protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation,
+ protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation,
Throwable thrown) {
PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
return setOutcome(params, operation, result);
@@ -657,10 +806,10 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @param result result of the operation
* @return the updated operation
*/
- protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation,
+ protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation,
PolicyResult result) {
logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
- operation.setOutcome(result.toString());
+ operation.setResult(result);
operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
: ControlLoopOperation.FAILED_MSG);
@@ -687,71 +836,10 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* Gets the operation timeout. Subclasses may override this method to obtain the
* timeout in some other way (e.g., through configuration properties).
*
- * @param policy policy from which to extract the timeout
+ * @param timeoutSec timeout, in seconds, or {@code null}
* @return the operation timeout, in milliseconds
*/
- protected long getTimeOutMillis(Policy policy) {
- Integer timeoutSec = policy.getTimeout();
+ protected long getTimeOutMillis(Integer timeoutSec) {
return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
}
-
- /**
- * Manager for "start" and "end" callbacks.
- */
- private static class CallbackManager implements Runnable {
- private final AtomicReference<Instant> startTime = new AtomicReference<>();
- private final AtomicReference<Instant> endTime = new AtomicReference<>();
-
- /**
- * Determines if the "start" callback can be invoked. If so, it sets the
- * {@link #startTime} to the current time.
- *
- * @return {@code true} if the "start" callback can be invoked, {@code false}
- * otherwise
- */
- public boolean canStart() {
- return startTime.compareAndSet(null, Instant.now());
- }
-
- /**
- * Determines if the "end" callback can be invoked. If so, it sets the
- * {@link #endTime} to the current time.
- *
- * @return {@code true} if the "end" callback can be invoked, {@code false}
- * otherwise
- */
- public boolean canEnd() {
- return endTime.compareAndSet(null, Instant.now());
- }
-
- /**
- * Gets the start time.
- *
- * @return the start time, or {@code null} if {@link #canStart()} has not been
- * invoked yet.
- */
- public Instant getStartTime() {
- return startTime.get();
- }
-
- /**
- * Gets the end time.
- *
- * @return the end time, or {@code null} if {@link #canEnd()} has not been invoked
- * yet.
- */
- public Instant getEndTime() {
- return endTime.get();
- }
-
- /**
- * Prevents further callbacks from being executed by setting {@link #startTime}
- * and {@link #endTime}.
- */
- @Override
- public void run() {
- canStart();
- canEnd();
- }
- }
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java
index 08aba81f2..57fce40d7 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java
@@ -20,6 +20,7 @@
package org.onap.policy.controlloop.actorserviceprovider.parameters;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -32,11 +33,11 @@ import lombok.Getter;
import org.onap.policy.common.parameters.BeanValidationResult;
import org.onap.policy.common.parameters.BeanValidator;
import org.onap.policy.common.parameters.annotations.NotNull;
-import org.onap.policy.controlloop.ControlLoopOperation;
import org.onap.policy.controlloop.actorserviceprovider.ActorService;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
import org.onap.policy.controlloop.actorserviceprovider.Util;
import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
-import org.onap.policy.controlloop.policy.Policy;
+import org.onap.policy.controlloop.policy.Target;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,36 +50,67 @@ import org.slf4j.LoggerFactory;
@AllArgsConstructor
@EqualsAndHashCode
public class ControlLoopOperationParams {
-
private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationParams.class);
- public static final String UNKNOWN = "-unknown-";
-
+ /**
+ * Actor name.
+ */
+ @NotNull
+ private String actor;
/**
- * The actor service in which to find the actor/operation.
+ * Actor service in which to find the actor/operation.
*/
@NotNull
private ActorService actorService;
/**
- * The event for which the operation applies.
+ * Event for which the operation applies.
*/
@NotNull
private ControlLoopEventContext context;
/**
- * The executor to use to run the operation.
+ * Executor to use to run the operation.
*/
@NotNull
@Builder.Default
private Executor executor = ForkJoinPool.commonPool();
/**
- * The policy associated with the operation.
+ * Operation name.
+ */
+ @NotNull
+ private String operation;
+
+ /**
+ * Payload data for the request.
+ */
+ private Map<String, String> payload;
+
+ /**
+ * Number of retries allowed, or {@code null} if no retries.
+ */
+ private Integer retry;
+
+ /**
+ * The entity's target information. May be {@code null}, depending on the requirement
+ * of the operation to be invoked.
+ */
+ private Target target;
+
+ /**
+ * Target entity.
*/
@NotNull
- private Policy policy;
+ private String targetEntity;
+
+ /**
+ * Timeout, in seconds, or {@code null} if no timeout. Zero and negative values also
+ * imply no timeout.
+ */
+ @Builder.Default
+ private Integer timeoutSec = 300;
/**
* The function to invoke when the operation starts. This is optional.
@@ -87,7 +119,7 @@ public class ControlLoopOperationParams {
* may happen if the current operation requires other operations to be performed first
* (e.g., A&AI queries, guard checks).
*/
- private Consumer<ControlLoopOperation> startCallback;
+ private Consumer<OperationOutcome> startCallback;
/**
* The function to invoke when the operation completes. This is optional.
@@ -96,13 +128,7 @@ public class ControlLoopOperationParams {
* may happen if the current operation requires other operations to be performed first
* (e.g., A&AI queries, guard checks).
*/
- private Consumer<ControlLoopOperation> completeCallback;
-
- /**
- * Target entity.
- */
- @NotNull
- private String target;
+ private Consumer<OperationOutcome> completeCallback;
/**
* Starts the specified operation.
@@ -110,7 +136,7 @@ public class ControlLoopOperationParams {
* @return a future that will return the result of the operation
* @throws IllegalArgumentException if the parameters are invalid
*/
- public CompletableFuture<ControlLoopOperation> start() {
+ public CompletableFuture<OperationOutcome> start() {
BeanValidationResult result = validate();
if (!result.isValid()) {
logger.warn("parameter error in operation {}.{} for {}:\n{}", getActor(), getOperation(), getRequestId(),
@@ -120,31 +146,13 @@ public class ControlLoopOperationParams {
// @formatter:off
return actorService
- .getActor(policy.getActor())
- .getOperator(policy.getRecipe())
+ .getActor(getActor())
+ .getOperator(getOperation())
.startOperation(this);
// @formatter:on
}
/**
- * Gets the name of the actor from the policy.
- *
- * @return the actor name, or {@link #UNKNOWN} if no name is available
- */
- public String getActor() {
- return (policy == null || policy.getActor() == null ? UNKNOWN : policy.getActor());
- }
-
- /**
- * Gets the name of the operation from the policy.
- *
- * @return the operation name, or {@link #UNKNOWN} if no name is available
- */
- public String getOperation() {
- return (policy == null || policy.getRecipe() == null ? UNKNOWN : policy.getRecipe());
- }
-
- /**
* Gets the requested ID of the associated event.
*
* @return the event's request ID, or {@code null} if no request ID is available
@@ -158,13 +166,13 @@ public class ControlLoopOperationParams {
*
* @return a new operation outcome
*/
- public ControlLoopOperation makeOutcome() {
- ControlLoopOperation operation = new ControlLoopOperation();
- operation.setActor(getActor());
- operation.setOperation(getOperation());
- operation.setTarget(target);
+ public OperationOutcome makeOutcome() {
+ OperationOutcome outcome = new OperationOutcome();
+ outcome.setActor(getActor());
+ outcome.setOperation(getOperation());
+ outcome.setTarget(targetEntity);
- return operation;
+ return outcome;
}
/**
@@ -173,11 +181,11 @@ public class ControlLoopOperationParams {
*
* @param operation the operation that is being started
*/
- public void callbackStarted(ControlLoopOperation operation) {
+ public void callbackStarted(OperationOutcome operation) {
logger.info("started operation {}.{} for {}", operation.getActor(), operation.getOperation(), getRequestId());
if (startCallback != null) {
- Util.logException(() -> startCallback.accept(operation), "{}.{}: start-callback threw an exception for {}",
+ Util.runFunction(() -> startCallback.accept(operation), "{}.{}: start-callback threw an exception for {}",
operation.getActor(), operation.getOperation(), getRequestId());
}
}
@@ -188,12 +196,12 @@ public class ControlLoopOperationParams {
*
* @param operation the operation that is being started
*/
- public void callbackCompleted(ControlLoopOperation operation) {
+ public void callbackCompleted(OperationOutcome operation) {
logger.info("completed operation {}.{} outcome={} for {}", operation.getActor(), operation.getOperation(),
- operation.getOutcome(), getRequestId());
+ operation.getResult(), getRequestId());
if (completeCallback != null) {
- Util.logException(() -> completeCallback.accept(operation),
+ Util.runFunction(() -> completeCallback.accept(operation),
"{}.{}: complete-callback threw an exception for {}", operation.getActor(),
operation.getOperation(), getRequestId());
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java
index d34a3fb5b..1d64a8710 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java
@@ -101,7 +101,7 @@ public class ListenerManager {
*/
protected void runListener(Runnable listener) {
// TODO do this asynchronously?
- Util.logException(listener, "pipeline listener {} threw an exception", listener);
+ Util.runFunction(listener, "pipeline listener {} threw an exception", listener);
}
/**
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java
index 96c8f9e05..92843e28a 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java
@@ -23,41 +23,70 @@ package org.onap.policy.controlloop.actorserviceprovider.pipeline;
import static org.onap.policy.controlloop.actorserviceprovider.Util.ident;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
+import java.util.function.Supplier;
import lombok.NoArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Pipeline controller, used by operations within the pipeline to determine if they should
- * continue to run. If {@link #cancel(boolean)} is invoked, it automatically stops the
- * pipeline.
+ * continue to run. Whenever this is canceled or completed, it automatically cancels all
+ * futures and runs all listeners that have been added.
*/
@NoArgsConstructor
public class PipelineControllerFuture<T> extends CompletableFuture<T> {
private static final Logger logger = LoggerFactory.getLogger(PipelineControllerFuture.class);
+ private static final String COMPLETE_EXCEPT_MSG = "{}: complete future with exception";
+ private static final String CANCEL_MSG = "{}: cancel future";
+ private static final String COMPLETE_MSG = "{}: complete future";
+
/**
* Tracks items added to this controller via one of the <i>add</i> methods.
*/
private final FutureManager futures = new FutureManager();
- /**
- * Cancels and stops the pipeline, in that order.
- */
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
- try {
- logger.trace("{}: cancel future", ident(this));
- return super.cancel(mayInterruptIfRunning);
+ return doAndStop(() -> super.cancel(mayInterruptIfRunning), CANCEL_MSG, ident(this));
+ }
- } finally {
- futures.stop();
- }
+ @Override
+ public boolean complete(T value) {
+ return doAndStop(() -> super.complete(value), COMPLETE_MSG, ident(this));
+ }
+
+ @Override
+ public boolean completeExceptionally(Throwable ex) {
+ return doAndStop(() -> super.completeExceptionally(ex), COMPLETE_EXCEPT_MSG, ident(this));
+ }
+
+ @Override
+ public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor) {
+ return super.completeAsync(() -> doAndStop(supplier, COMPLETE_MSG, ident(this)), executor);
+ }
+
+ @Override
+ public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) {
+ return super.completeAsync(() -> doAndStop(supplier, COMPLETE_MSG, ident(this)));
+ }
+
+ @Override
+ public CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit) {
+ logger.info("{}: set future timeout to {} {}", ident(this), timeout, unit);
+ return super.completeOnTimeout(value, timeout, unit);
+ }
+
+ @Override
+ public <U> PipelineControllerFuture<U> newIncompleteFuture() {
+ return new PipelineControllerFuture<>();
}
/**
@@ -67,11 +96,8 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> {
*
* @return a function that removes the given future
*/
- public <F> BiConsumer<T, Throwable> delayedRemove(Future<F> future) {
- return (value, thrown) -> {
- logger.trace("{}: remove future {}", ident(this), ident(future));
- remove(future);
- };
+ public <F> BiConsumer<F, Throwable> delayedRemove(Future<F> future) {
+ return (value, thrown) -> remove(future);
}
/**
@@ -81,11 +107,8 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> {
*
* @return a function that removes the given listener
*/
- public BiConsumer<T, Throwable> delayedRemove(Runnable listener) {
- return (value, thrown) -> {
- logger.trace("{}: remove listener {}", ident(this), ident(listener));
- remove(listener);
- };
+ public <F> BiConsumer<F, Throwable> delayedRemove(Runnable listener) {
+ return (value, thrown) -> remove(listener);
}
/**
@@ -98,25 +121,43 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> {
public BiConsumer<T, Throwable> delayedComplete() {
return (value, thrown) -> {
if (thrown == null) {
- logger.trace("{}: complete and stop future", ident(this));
complete(value);
} else {
- logger.trace("{}: complete exceptionally and stop future", ident(this));
completeExceptionally(thrown);
}
-
- futures.stop();
};
}
/**
+ * Adds a future to the controller and arranges for it to be removed from the
+ * controller when it completes, whether or not it throws an exception. If the
+ * controller has already been stopped, then the future is canceled and a new,
+ * incomplete future is returned.
+ *
+ * @param future future to be wrapped
+ * @return a new future
+ */
+ public CompletableFuture<T> wrap(CompletableFuture<T> future) {
+ if (!isRunning()) {
+ logger.trace("{}: not running, skipping next task {}", ident(this), ident(future));
+ future.cancel(false);
+ return new CompletableFuture<>();
+ }
+
+ add(future);
+ return future.whenComplete(this.delayedRemove(future));
+ }
+
+ /**
* Adds a function whose return value is to be canceled when this controller is
* stopped. Note: if the controller is already stopped, then the function will
* <i>not</i> be executed.
*
- * @param futureMaker function to be invoked in the future
+ * @param futureMaker function to be invoked to create the future
+ * @return a function to create the future and arrange for it to be managed by this
+ * controller
*/
- public <F> Function<F, CompletableFuture<F>> add(Function<F, CompletableFuture<F>> futureMaker) {
+ public <F> Function<F, CompletableFuture<F>> wrap(Function<F, CompletableFuture<F>> futureMaker) {
return input -> {
if (!isRunning()) {
@@ -127,7 +168,7 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> {
CompletableFuture<F> future = futureMaker.apply(input);
add(future);
- return future;
+ return future.whenComplete(delayedRemove(future));
};
}
@@ -154,4 +195,26 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> {
logger.trace("{}: remove listener {}", ident(this), ident(listener));
futures.remove(listener);
}
+
+ /**
+ * Performs an operation, stops the futures, and returns the value from the operation.
+ * Logs a message using the given arguments.
+ *
+ *
+ * @param <R> type of value to be returned
+ * @param supplier operation to perform
+ * @param message message to be logged
+ * @param args message arguments to fill "{}" place-holders
+ * @return the operation's result
+ */
+ private <R> R doAndStop(Supplier<R> supplier, String message, Object... args) {
+ try {
+ logger.trace(message, args);
+ return supplier.get();
+
+ } finally {
+ logger.trace("{}: stopping this future", ident(this));
+ futures.stop();
+ }
+ }
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandlerTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandlerTest.java
new file mode 100644
index 000000000..31c6d2077
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandlerTest.java
@@ -0,0 +1,172 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.controlloop.VirtualControlLoopEvent;
+import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.policy.PolicyResult;
+
+public class AsyncResponseHandlerTest {
+
+ private static final String ACTOR = "my-actor";
+ private static final String OPERATION = "my-operation";
+ private static final UUID REQ_ID = UUID.randomUUID();
+ private static final String TEXT = "some text";
+
+ private VirtualControlLoopEvent event;
+ private ControlLoopEventContext context;
+ private ControlLoopOperationParams params;
+ private OperationOutcome outcome;
+ private MyHandler handler;
+
+ /**
+ * Initializes all fields, including {@link #handler}.
+ */
+ @Before
+ public void setUp() {
+ event = new VirtualControlLoopEvent();
+ event.setRequestId(REQ_ID);
+
+ context = new ControlLoopEventContext(event);
+ params = ControlLoopOperationParams.builder().actor(ACTOR).operation(OPERATION).context(context).build();
+ outcome = params.makeOutcome();
+
+ handler = new MyHandler(params, outcome);
+ }
+
+ @Test
+ public void testAsyncResponseHandler_testGetParams_testGetOutcome() {
+ assertSame(params, handler.getParams());
+ assertSame(outcome, handler.getOutcome());
+ }
+
+ @Test
+ public void testHandle() {
+ CompletableFuture<String> future = new CompletableFuture<>();
+ handler.handle(future).complete(outcome);
+
+ assertTrue(future.isCancelled());
+ }
+
+ @Test
+ public void testCompleted() throws Exception {
+ CompletableFuture<OperationOutcome> result = handler.handle(new CompletableFuture<>());
+ handler.completed(TEXT);
+ assertTrue(result.isDone());
+ assertSame(outcome, result.get());
+ assertEquals(PolicyResult.FAILURE_RETRIES, outcome.getResult());
+ assertEquals(TEXT, outcome.getMessage());
+ }
+
+ /**
+ * Tests completed() when doCompleted() throws an exception.
+ */
+ @Test
+ public void testCompletedException() throws Exception {
+ IllegalStateException except = new IllegalStateException();
+
+ outcome = params.makeOutcome();
+ handler = new MyHandler(params, outcome) {
+ @Override
+ protected OperationOutcome doComplete(String rawResponse) {
+ throw except;
+ }
+ };
+
+ CompletableFuture<OperationOutcome> result = handler.handle(new CompletableFuture<>());
+ handler.completed(TEXT);
+ assertTrue(result.isCompletedExceptionally());
+
+ AtomicReference<Throwable> thrown = new AtomicReference<>();
+ result.whenComplete((unused, thrown2) -> thrown.set(thrown2));
+
+ assertSame(except, thrown.get());
+ }
+
+ @Test
+ public void testFailed() throws Exception {
+ IllegalStateException except = new IllegalStateException();
+
+ CompletableFuture<OperationOutcome> result = handler.handle(new CompletableFuture<>());
+ handler.failed(except);
+
+ assertTrue(result.isDone());
+ assertSame(outcome, result.get());
+ assertEquals(PolicyResult.FAILURE_GUARD, outcome.getResult());
+ }
+
+ /**
+ * Tests failed() when doFailed() throws an exception.
+ */
+ @Test
+ public void testFailedException() throws Exception {
+ IllegalStateException except = new IllegalStateException();
+
+ outcome = params.makeOutcome();
+ handler = new MyHandler(params, outcome) {
+ @Override
+ protected OperationOutcome doFailed(Throwable thrown) {
+ throw except;
+ }
+ };
+
+ CompletableFuture<OperationOutcome> result = handler.handle(new CompletableFuture<>());
+ handler.failed(except);
+ assertTrue(result.isCompletedExceptionally());
+
+ AtomicReference<Throwable> thrown = new AtomicReference<>();
+ result.whenComplete((unused, thrown2) -> thrown.set(thrown2));
+
+ assertSame(except, thrown.get());
+ }
+
+ private class MyHandler extends AsyncResponseHandler<String> {
+
+ public MyHandler(ControlLoopOperationParams params, OperationOutcome outcome) {
+ super(params, outcome);
+ }
+
+ @Override
+ protected OperationOutcome doComplete(String rawResponse) {
+ OperationOutcome outcome = getOutcome();
+ outcome.setResult(PolicyResult.FAILURE_RETRIES);
+ outcome.setMessage(rawResponse);
+ return outcome;
+ }
+
+ @Override
+ protected OperationOutcome doFailed(Throwable thrown) {
+ OperationOutcome outcome = getOutcome();
+ outcome.setResult(PolicyResult.FAILURE_GUARD);
+ return outcome;
+ }
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManagerTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManagerTest.java
new file mode 100644
index 000000000..44606cb14
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManagerTest.java
@@ -0,0 +1,89 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CallbackManagerTest {
+
+ private CallbackManager mgr;
+
+ @Before
+ public void setUp() {
+ mgr = new CallbackManager();
+ }
+
+ @Test
+ public void testCanStart_testGetStartTime() {
+ // null until canXxx() is called
+ assertNull(mgr.getStartTime());
+
+ assertTrue(mgr.canStart());
+
+ Instant time = mgr.getStartTime();
+ assertNotNull(time);
+ assertNull(mgr.getEndTime());
+
+ // false for now on
+ assertFalse(mgr.canStart());
+ assertFalse(mgr.canStart());
+
+ assertEquals(time, mgr.getStartTime());
+ }
+
+ @Test
+ public void testCanEnd_testGetEndTime() {
+ // null until canXxx() is called
+ assertNull(mgr.getEndTime());
+ assertNull(mgr.getEndTime());
+
+ assertTrue(mgr.canEnd());
+
+ Instant time = mgr.getEndTime();
+ assertNotNull(time);
+ assertNull(mgr.getStartTime());
+
+ // false for now on
+ assertFalse(mgr.canEnd());
+ assertFalse(mgr.canEnd());
+
+ assertEquals(time, mgr.getEndTime());
+ }
+
+ @Test
+ public void testRun() {
+ mgr.run();
+
+ assertNotNull(mgr.getStartTime());
+ assertNotNull(mgr.getEndTime());
+
+ assertFalse(mgr.canStart());
+ assertFalse(mgr.canEnd());
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcomeTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcomeTest.java
new file mode 100644
index 000000000..4e9728336
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcomeTest.java
@@ -0,0 +1,137 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.controlloop.ControlLoopOperation;
+import org.onap.policy.controlloop.policy.PolicyResult;
+
+public class OperationOutcomeTest {
+ private static final String ACTOR = "my-actor";
+ private static final String OPERATION = "my-operation";
+ private static final String TARGET = "my-target";
+ private static final Instant START = Instant.ofEpochMilli(10);
+ private static final Instant END = Instant.ofEpochMilli(20);
+ private static final String SUB_REQ_ID = "my-sub-request-id";
+ private static final PolicyResult RESULT = PolicyResult.FAILURE_GUARD;
+ private static final String MESSAGE = "my-message";
+
+ private OperationOutcome outcome;
+
+ @Before
+ public void setUp() {
+ outcome = new OperationOutcome();
+ }
+
+ @Test
+ public void testOperationOutcomeOperationOutcome() {
+ setAll();
+
+ OperationOutcome outcome2 = new OperationOutcome(outcome);
+
+ assertEquals(ACTOR, outcome2.getActor());
+ assertEquals(OPERATION, outcome2.getOperation());
+ assertEquals(TARGET, outcome2.getTarget());
+ assertEquals(START, outcome2.getStart());
+ assertEquals(END, outcome2.getEnd());
+ assertEquals(SUB_REQ_ID, outcome2.getSubRequestId());
+ assertEquals(RESULT, outcome2.getResult());
+ assertEquals(MESSAGE, outcome2.getMessage());
+ }
+
+ @Test
+ public void testToControlLoopOperation() {
+ setAll();
+
+ ControlLoopOperation outcome2 = outcome.toControlLoopOperation();
+
+ assertEquals(ACTOR, outcome2.getActor());
+ assertEquals(OPERATION, outcome2.getOperation());
+ assertEquals(TARGET, outcome2.getTarget());
+ assertEquals(START, outcome2.getStart());
+ assertEquals(END, outcome2.getEnd());
+ assertEquals(SUB_REQ_ID, outcome2.getSubRequestId());
+ assertEquals(RESULT.toString(), outcome2.getOutcome());
+ assertEquals(MESSAGE, outcome2.getMessage());
+ }
+
+ /**
+ * Tests both isFor() methods, as one invokes the other.
+ */
+ @Test
+ public void testIsFor() {
+ setAll();
+
+ // null case
+ assertFalse(OperationOutcome.isFor(null, ACTOR, OPERATION));
+
+ // actor mismatch
+ assertFalse(OperationOutcome.isFor(outcome, TARGET, OPERATION));
+
+ // operation mismatch
+ assertFalse(OperationOutcome.isFor(outcome, ACTOR, TARGET));
+
+ // null actor in outcome
+ outcome.setActor(null);
+ assertFalse(OperationOutcome.isFor(outcome, ACTOR, OPERATION));
+ outcome.setActor(ACTOR);
+
+ // null operation in outcome
+ outcome.setOperation(null);
+ assertFalse(OperationOutcome.isFor(outcome, ACTOR, OPERATION));
+ outcome.setOperation(OPERATION);
+
+ // null actor argument
+ assertThatThrownBy(() -> outcome.isFor(null, OPERATION));
+
+ // null operation argument
+ assertThatThrownBy(() -> outcome.isFor(ACTOR, null));
+
+ // true case
+ assertTrue(OperationOutcome.isFor(outcome, ACTOR, OPERATION));
+ }
+
+ @Test
+ public void testSetResult() {
+ outcome.setResult(PolicyResult.FAILURE_EXCEPTION);
+ assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
+
+ assertThatThrownBy(() -> outcome.setResult(null));
+ }
+
+ private void setAll() {
+ outcome.setActor(ACTOR);
+ outcome.setEnd(END);
+ outcome.setMessage(MESSAGE);
+ outcome.setOperation(OPERATION);
+ outcome.setResult(RESULT);
+ outcome.setStart(START);
+ outcome.setSubRequestId(SUB_REQ_ID);
+ outcome.setTarget(TARGET);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/UtilTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/UtilTest.java
index c652e8374..4a3f321cf 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/UtilTest.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/UtilTest.java
@@ -27,15 +27,56 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import ch.qos.logback.classic.Logger;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Builder;
import lombok.Data;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
+import org.slf4j.LoggerFactory;
public class UtilTest {
+ private static final String MY_REQUEST = "my-request";
+ private static final String URL = "my-url";
+ private static final String OUT_URL = "OUT|REST|my-url";
+ private static final String IN_URL = "IN|REST|my-url";
+ protected static final String EXPECTED_EXCEPTION = "expected exception";
+
+ /**
+ * Used to attach an appender to the class' logger.
+ */
+ private static final Logger logger = (Logger) LoggerFactory.getLogger(Util.class);
+ private static final ExtractAppender appender = new ExtractAppender();
+
+ /**
+ * Initializes statics.
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() {
+ appender.setContext(logger.getLoggerContext());
+ appender.start();
+
+ logger.addAppender(appender);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() {
+ appender.stop();
+ }
+
+ @Before
+ public void setUp() {
+ appender.clearExtractions();
+ }
@Test
public void testIdent() {
@@ -48,11 +89,88 @@ public class UtilTest {
}
@Test
- public void testLogException() {
+ public void testLogRestRequest() throws CoderException {
+ // log structured data
+ appender.clearExtractions();
+ Util.logRestRequest(URL, new Abc(10, null, null));
+ List<String> output = appender.getExtracted();
+ assertEquals(1, output.size());
+
+ assertThat(output.get(0)).contains(OUT_URL).contains("{\n \"intValue\": 10\n}");
+
+ // log a plain string
+ appender.clearExtractions();
+ Util.logRestRequest(URL, MY_REQUEST);
+ output = appender.getExtracted();
+ assertEquals(1, output.size());
+
+ assertThat(output.get(0)).contains(OUT_URL).contains(MY_REQUEST);
+
+ // exception from coder
+ StandardCoder coder = new StandardCoder() {
+ @Override
+ public String encode(Object object, boolean pretty) throws CoderException {
+ throw new CoderException(EXPECTED_EXCEPTION);
+ }
+ };
+
+ appender.clearExtractions();
+ Util.logRestRequest(coder, URL, new Abc(11, null, null));
+ output = appender.getExtracted();
+ assertEquals(2, output.size());
+ assertThat(output.get(0)).contains("cannot pretty-print request");
+ assertThat(output.get(1)).contains(OUT_URL);
+ }
+
+ @Test
+ public void testLogRestResponse() throws CoderException {
+ // log structured data
+ appender.clearExtractions();
+ Util.logRestResponse(URL, new Abc(10, null, null));
+ List<String> output = appender.getExtracted();
+ assertEquals(1, output.size());
+
+ assertThat(output.get(0)).contains(IN_URL).contains("{\n \"intValue\": 10\n}");
+
+ // log null response
+ appender.clearExtractions();
+ Util.logRestResponse(URL, null);
+ output = appender.getExtracted();
+ assertEquals(1, output.size());
+
+ assertThat(output.get(0)).contains(IN_URL).contains("null");
+
+ // log a plain string
+ appender.clearExtractions();
+ Util.logRestResponse(URL, MY_REQUEST);
+ output = appender.getExtracted();
+ assertEquals(1, output.size());
+
+ assertThat(output.get(0)).contains(IN_URL).contains(MY_REQUEST);
+
+ // exception from coder
+ StandardCoder coder = new StandardCoder() {
+ @Override
+ public String encode(Object object, boolean pretty) throws CoderException {
+ throw new CoderException(EXPECTED_EXCEPTION);
+ }
+ };
+
+ appender.clearExtractions();
+ Util.logRestResponse(coder, URL, new Abc(11, null, null));
+ output = appender.getExtracted();
+ assertEquals(2, output.size());
+ assertThat(output.get(0)).contains("cannot pretty-print response");
+ assertThat(output.get(1)).contains(IN_URL);
+ }
+
+ @Test
+ public void testRunFunction() {
// no exception, no log
AtomicInteger count = new AtomicInteger();
- Util.logException(() -> count.incrementAndGet(), "no error");
+ Util.runFunction(() -> count.incrementAndGet(), "no error");
assertEquals(1, count.get());
+ assertEquals(0, appender.getExtracted().size());
// with an exception
Runnable runnable = () -> {
@@ -60,8 +178,17 @@ public class UtilTest {
throw new IllegalStateException("expected exception");
};
- Util.logException(runnable, "error with no args");
- Util.logException(runnable, "error {} {} arg(s)", "with", 1);
+ appender.clearExtractions();
+ Util.runFunction(runnable, "error with no args");
+ List<String> output = appender.getExtracted();
+ assertEquals(1, output.size());
+ assertThat(output.get(0)).contains("error with no args");
+
+ appender.clearExtractions();
+ Util.runFunction(runnable, "error {} {} arg(s)", "with", 2);
+ output = appender.getExtracted();
+ assertEquals(1, output.size());
+ assertThat(output.get(0)).contains("error with 2 arg(s)");
}
@Test
@@ -123,4 +250,14 @@ public class UtilTest {
private int intValue;
private String strValue;
}
+
+ // throws an exception when getXxx() is used
+ public static class DataWithException {
+ @SuppressWarnings("unused")
+ private int intValue;
+
+ public int getIntValue() {
+ throw new IllegalStateException();
+ }
+ }
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java
index fcc3fb12e..0d917ad3e 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java
@@ -20,28 +20,55 @@
package org.onap.policy.controlloop.actorserviceprovider.controlloop;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
+import java.util.Map;
+import java.util.UUID;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.controlloop.VirtualControlLoopEvent;
public class ControlLoopEventContextTest {
+ private static final UUID REQ_ID = UUID.randomUUID();
+ private Map<String, String> enrichment;
private VirtualControlLoopEvent event;
private ControlLoopEventContext context;
+ /**
+ * Initializes data, including {@link #context}.
+ */
@Before
public void setUp() {
+ enrichment = Map.of("abc", "one", "def", "two");
+
event = new VirtualControlLoopEvent();
+ event.setRequestId(REQ_ID);
+ event.setAai(enrichment);
+
context = new ControlLoopEventContext(event);
}
@Test
public void testControlLoopEventContext() {
assertSame(event, context.getEvent());
+ assertSame(REQ_ID, context.getRequestId());
+ assertEquals(enrichment, context.getEnrichment());
+
+ // null event
+ assertThatThrownBy(() -> new ControlLoopEventContext(null));
+
+ // no request id, no enrichment data
+ event.setRequestId(null);
+ event.setAai(null);
+ context = new ControlLoopEventContext(event);
+ assertSame(event, context.getEvent());
+ assertNotNull(context.getRequestId());
+ assertEquals(Map.of(), context.getEnrichment());
}
@Test
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImplTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImplTest.java
index 7e0c35a3f..a209fb0d8 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImplTest.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImplTest.java
@@ -34,7 +34,6 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
@@ -192,10 +191,10 @@ public class ActorImplTest {
}
@Test
- public void testSetOperators() {
- // cannot set operators if already configured
+ public void testAddOperator() {
+ // cannot add operators if already configured
actor.configure(params);
- assertThatIllegalStateException().isThrownBy(() -> actor.setOperators(Collections.emptyList()));
+ assertThatIllegalStateException().isThrownBy(() -> actor.addOperator(oper1));
/*
* make an actor where operators two and four have names that are duplicates of
@@ -367,7 +366,13 @@ public class ActorImplTest {
* @return a new actor
*/
private ActorImpl makeActor(Operator... operators) {
- return new ActorImpl(ACTOR_NAME, operators);
+ ActorImpl actor = new ActorImpl(ACTOR_NAME);
+
+ for (Operator oper : operators) {
+ actor.addOperator(oper);
+ }
+
+ return actor;
}
private static class MyOper extends OperatorPartial implements Operator {
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActorTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActorTest.java
new file mode 100644
index 000000000..2da789989
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActorTest.java
@@ -0,0 +1,81 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Function;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpActorParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
+
+public class HttpActorTest {
+
+ private static final String ACTOR = "my-actor";
+ private static final String UNKNOWN = "unknown";
+ private static final String CLIENT = "my-client";
+ private static final long TIMEOUT = 10L;
+
+ private HttpActor actor;
+
+ @Before
+ public void setUp() {
+ actor = new HttpActor(ACTOR);
+ }
+
+ @Test
+ public void testMakeOperatorParameters() {
+ HttpActorParams params = new HttpActorParams();
+ params.setClientName(CLIENT);
+ params.setTimeoutSec(TIMEOUT);
+ params.setPath(Map.of("operA", "urlA", "operB", "urlB"));
+
+ final HttpActor prov = new HttpActor(ACTOR);
+ Function<String, Map<String, Object>> maker =
+ prov.makeOperatorParameters(Util.translateToMap(prov.getName(), params));
+
+ assertNull(maker.apply(UNKNOWN));
+
+ // use a TreeMap to ensure the properties are sorted
+ assertEquals("{clientName=my-client, path=urlA, timeoutSec=10}",
+ new TreeMap<>(maker.apply("operA")).toString());
+
+ assertEquals("{clientName=my-client, path=urlB, timeoutSec=10}",
+ new TreeMap<>(maker.apply("operB")).toString());
+
+ // with invalid actor parameters
+ params.setClientName(null);
+ assertThatThrownBy(() -> prov.makeOperatorParameters(Util.translateToMap(prov.getName(), params)))
+ .isInstanceOf(ParameterValidationRuntimeException.class);
+ }
+
+ @Test
+ public void testHttpActor() {
+ assertEquals(ACTOR, actor.getName());
+ assertEquals(ACTOR, actor.getFullName());
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperatorTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperatorTest.java
new file mode 100644
index 000000000..c006cf333
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperatorTest.java
@@ -0,0 +1,104 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.endpoints.http.client.HttpClient;
+import org.onap.policy.common.endpoints.http.client.HttpClientFactory;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
+
+public class HttpOperatorTest {
+
+ private static final String ACTOR = "my-actor";
+ private static final String OPERATION = "my-name";
+ private static final String CLIENT = "my-client";
+ private static final String PATH = "my-path";
+ private static final long TIMEOUT = 100;
+
+ @Mock
+ private HttpClient client;
+
+ private HttpOperator oper;
+
+ /**
+ * Initializes fields, including {@link #oper}.
+ */
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ oper = new HttpOperator(ACTOR, OPERATION);
+ }
+
+ @Test
+ public void testDoConfigureMapOfStringObject_testGetClient_testGetPath_testGetTimeoutSec() {
+ assertNull(oper.getClient());
+ assertNull(oper.getPath());
+ assertEquals(0L, oper.getTimeoutSec());
+
+ oper = new HttpOperator(ACTOR, OPERATION) {
+ @Override
+ protected HttpClientFactory getClientFactory() {
+ HttpClientFactory factory = mock(HttpClientFactory.class);
+ when(factory.get(CLIENT)).thenReturn(client);
+ return factory;
+ }
+ };
+
+ HttpParams params = HttpParams.builder().clientName(CLIENT).path(PATH).timeoutSec(TIMEOUT).build();
+ Map<String, Object> paramMap = Util.translateToMap(OPERATION, params);
+ oper.configure(paramMap);
+
+ assertSame(client, oper.getClient());
+ assertEquals(PATH, oper.getPath());
+ assertEquals(TIMEOUT, oper.getTimeoutSec());
+
+ // test invalid parameters
+ paramMap.remove("path");
+ assertThatThrownBy(() -> oper.configure(paramMap)).isInstanceOf(ParameterValidationRuntimeException.class);
+ }
+
+ @Test
+ public void testHttpOperator() {
+ assertEquals(ACTOR, oper.getActorName());
+ assertEquals(OPERATION, oper.getName());
+ assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
+ }
+
+ @Test
+ public void testGetClient() {
+ assertNotNull(oper.getClientFactory());
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartialTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartialTest.java
index 864ac829a..21bc656f2 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartialTest.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartialTest.java
@@ -29,6 +29,7 @@ import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.time.Instant;
@@ -36,17 +37,20 @@ import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Queue;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@@ -58,9 +62,10 @@ import org.junit.Before;
import org.junit.Test;
import org.onap.policy.controlloop.ControlLoopOperation;
import org.onap.policy.controlloop.VirtualControlLoopEvent;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
-import org.onap.policy.controlloop.policy.Policy;
+import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
import org.onap.policy.controlloop.policy.PolicyResult;
public class OperatorPartialTest {
@@ -75,14 +80,10 @@ public class OperatorPartialTest {
private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
.filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
- private static final List<String> FAILURE_STRINGS =
- FAILURE_RESULTS.stream().map(Object::toString).collect(Collectors.toList());
-
private VirtualControlLoopEvent event;
private Map<String, Object> config;
private ControlLoopEventContext context;
private MyExec executor;
- private Policy policy;
private ControlLoopOperationParams params;
private MyOper oper;
@@ -92,8 +93,8 @@ public class OperatorPartialTest {
private Instant tstart;
- private ControlLoopOperation opstart;
- private ControlLoopOperation opend;
+ private OperationOutcome opstart;
+ private OperationOutcome opend;
/**
* Initializes the fields, including {@link #oper}.
@@ -107,13 +108,9 @@ public class OperatorPartialTest {
context = new ControlLoopEventContext(event);
executor = new MyExec();
- policy = new Policy();
- policy.setActor(ACTOR);
- policy.setRecipe(OPERATOR);
- policy.setTimeout(TIMEOUT);
-
params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
- .executor(executor).policy(policy).startCallback(this::starter).target(TARGET).build();
+ .executor(executor).actor(ACTOR).operation(OPERATOR).timeoutSec(TIMEOUT)
+ .startCallback(this::starter).targetEntity(TARGET).build();
oper = new MyOper();
oper.configure(new TreeMap<>());
@@ -133,6 +130,31 @@ public class OperatorPartialTest {
}
@Test
+ public void testGetBlockingExecutor() throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ /*
+ * Use an operator that doesn't override getBlockingExecutor().
+ */
+ OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATOR) {};
+ oper2.getBlockingExecutor().execute(() -> latch.countDown());
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testDoConfigure() {
+ oper = spy(new MyOper());
+
+ oper.configure(config);
+ verify(oper).configure(config);
+
+ // repeat - SHOULD be run again
+ oper.configure(config);
+ verify(oper, times(2)).configure(config);
+ }
+
+ @Test
public void testDoStart() {
oper = spy(new MyOper());
@@ -181,7 +203,7 @@ public class OperatorPartialTest {
}
@Test
- public void testStartOperation_testVerifyRunning() {
+ public void testStartOperation() {
verifyRun("testStartOperation", 1, 1, PolicyResult.SUCCESS);
}
@@ -201,17 +223,13 @@ public class OperatorPartialTest {
* Tests startOperation() when the operation has a preprocessor.
*/
@Test
- public void testStartOperationWithPreprocessor_testStartPreprocessor() {
+ public void testStartOperationWithPreprocessor() {
AtomicInteger count = new AtomicInteger();
- // @formatter:off
- Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preproc =
- oper -> CompletableFuture.supplyAsync(() -> {
- count.incrementAndGet();
- oper.setOutcome(PolicyResult.SUCCESS.toString());
- return oper;
- }, executor);
- // @formatter:on
+ CompletableFuture<OperationOutcome> preproc = CompletableFuture.supplyAsync(() -> {
+ count.incrementAndGet();
+ return makeSuccess();
+ }, executor);
oper.setPreProcessor(preproc);
@@ -233,7 +251,7 @@ public class OperatorPartialTest {
assertNotNull(opstart);
assertNotNull(opend);
- assertEquals(PolicyResult.SUCCESS.toString(), opend.getOutcome());
+ assertEquals(PolicyResult.SUCCESS, opend.getResult());
assertEquals(MAX_PARALLEL_REQUESTS, numStart);
assertEquals(MAX_PARALLEL_REQUESTS, oper.getCount());
@@ -245,11 +263,7 @@ public class OperatorPartialTest {
*/
@Test
public void testStartPreprocessorFailure() {
- // arrange for the preprocessor to return a failure
- oper.setPreProcessor(oper -> {
- oper.setOutcome(PolicyResult.FAILURE_GUARD.toString());
- return CompletableFuture.completedFuture(oper);
- });
+ oper.setPreProcessor(CompletableFuture.completedFuture(makeFailure()));
verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
}
@@ -260,9 +274,7 @@ public class OperatorPartialTest {
@Test
public void testStartPreprocessorException() {
// arrange for the preprocessor to throw an exception
- oper.setPreProcessor(oper -> {
- throw new IllegalStateException(EXPECTED_EXCEPTION);
- });
+ oper.setPreProcessor(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
}
@@ -273,10 +285,7 @@ public class OperatorPartialTest {
@Test
public void testStartPreprocessorNotRunning() {
// arrange for the preprocessor to return success, which will be ignored
- oper.setPreProcessor(oper -> {
- oper.setOutcome(PolicyResult.SUCCESS.toString());
- return CompletableFuture.completedFuture(oper);
- });
+ oper.setPreProcessor(CompletableFuture.completedFuture(makeSuccess()));
oper.startOperation(params).cancel(false);
assertTrue(executor.runAll());
@@ -296,8 +305,7 @@ public class OperatorPartialTest {
public void testStartPreprocessorBuilderException() {
oper = new MyOper() {
@Override
- protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture(
- ControlLoopOperationParams params) {
+ protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) {
throw new IllegalStateException(EXPECTED_EXCEPTION);
}
};
@@ -312,51 +320,27 @@ public class OperatorPartialTest {
}
@Test
- public void testDoPreprocessorAsFuture() {
- assertNull(oper.doPreprocessorAsFuture(params));
+ public void testStartPreprocessorAsync() {
+ assertNull(oper.startPreprocessorAsync(params));
}
@Test
- public void testStartOperationOnly_testDoOperationAsFuture() {
+ public void testStartOperationAsync() {
oper.startOperation(params);
assertTrue(executor.runAll());
assertEquals(1, oper.getCount());
}
- /**
- * Tests startOperationOnce() when
- * {@link OperatorPartial#doOperationAsFuture(ControlLoopOperationParams)} throws an
- * exception.
- */
- @Test
- public void testStartOperationOnceBuilderException() {
- oper = new MyOper() {
- @Override
- protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture(
- ControlLoopOperationParams params, int attempt) {
- throw new IllegalStateException(EXPECTED_EXCEPTION);
- }
- };
-
- oper.configure(new TreeMap<>());
- oper.start();
-
- assertThatIllegalStateException().isThrownBy(() -> oper.startOperation(params));
-
- // should be nothing in the queue
- assertEquals(0, executor.getQueueLength());
- }
-
@Test
public void testIsSuccess() {
- ControlLoopOperation outcome = new ControlLoopOperation();
+ OperationOutcome outcome = new OperationOutcome();
- outcome.setOutcome(PolicyResult.SUCCESS.toString());
+ outcome.setResult(PolicyResult.SUCCESS);
assertTrue(oper.isSuccess(outcome));
- for (String failure : FAILURE_STRINGS) {
- outcome.setOutcome(failure);
+ for (PolicyResult failure : FAILURE_RESULTS) {
+ outcome.setResult(failure);
assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
}
}
@@ -365,17 +349,17 @@ public class OperatorPartialTest {
public void testIsActorFailed() {
assertFalse(oper.isActorFailed(null));
- ControlLoopOperation outcome = params.makeOutcome();
+ OperationOutcome outcome = params.makeOutcome();
// incorrect outcome
- outcome.setOutcome(PolicyResult.SUCCESS.toString());
+ outcome.setResult(PolicyResult.SUCCESS);
assertFalse(oper.isActorFailed(outcome));
- outcome.setOutcome(PolicyResult.FAILURE_RETRIES.toString());
+ outcome.setResult(PolicyResult.FAILURE_RETRIES);
assertFalse(oper.isActorFailed(outcome));
// correct outcome
- outcome.setOutcome(PolicyResult.FAILURE.toString());
+ outcome.setResult(PolicyResult.FAILURE);
// incorrect actor
outcome.setActor(TARGET);
@@ -400,7 +384,12 @@ public class OperatorPartialTest {
/*
* Use an operator that doesn't override doOperation().
*/
- OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATOR) {};
+ OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATOR) {
+ @Override
+ protected Executor getBlockingExecutor() {
+ return executor;
+ }
+ };
oper2.configure(new TreeMap<>());
oper2.start();
@@ -409,7 +398,7 @@ public class OperatorPartialTest {
assertTrue(executor.runAll());
assertNotNull(opend);
- assertEquals(PolicyResult.FAILURE_EXCEPTION.toString(), opend.getOutcome());
+ assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
}
@Test
@@ -421,36 +410,34 @@ public class OperatorPartialTest {
// trigger timeout very quickly
oper = new MyOper() {
@Override
- protected long getTimeOutMillis(Policy policy) {
+ protected long getTimeOutMillis(Integer timeoutSec) {
return 1;
}
@Override
- protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture(
- ControlLoopOperationParams params, int attempt) {
-
- return outcome -> {
- ControlLoopOperation outcome2 = params.makeOutcome();
- outcome2.setOutcome(PolicyResult.SUCCESS.toString());
-
- /*
- * Create an incomplete future that will timeout after the operation's
- * timeout. If it fires before the other timer, then it will return a
- * SUCCESS outcome.
- */
- CompletableFuture<ControlLoopOperation> future = new CompletableFuture<>();
- future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
- params.getExecutor());
-
- return future;
- };
+ protected CompletableFuture<OperationOutcome> startOperationAsync(ControlLoopOperationParams params,
+ int attempt, OperationOutcome outcome) {
+
+ OperationOutcome outcome2 = params.makeOutcome();
+ outcome2.setResult(PolicyResult.SUCCESS);
+
+ /*
+ * Create an incomplete future that will timeout after the operation's
+ * timeout. If it fires before the other timer, then it will return a
+ * SUCCESS outcome.
+ */
+ CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
+ future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
+ params.getExecutor());
+
+ return future;
}
};
oper.configure(new TreeMap<>());
oper.start();
- assertEquals(PolicyResult.FAILURE_TIMEOUT.toString(), oper.startOperation(params).get().getOutcome());
+ assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.startOperation(params).get().getResult());
}
/**
@@ -466,40 +453,45 @@ public class OperatorPartialTest {
// trigger timeout very quickly
oper = new MyOper() {
@Override
- protected long getTimeOutMillis(Policy policy) {
+ protected long getTimeOutMillis(Integer timeoutSec) {
return 10;
}
@Override
- protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture(
- ControlLoopOperationParams params) {
-
- return outcome -> {
- outcome.setOutcome(PolicyResult.SUCCESS.toString());
-
- /*
- * Create an incomplete future that will timeout after the operation's
- * timeout. If it fires before the other timer, then it will return a
- * SUCCESS outcome.
- */
- CompletableFuture<ControlLoopOperation> future = new CompletableFuture<>();
- future = future.orTimeout(200, TimeUnit.MILLISECONDS).handleAsync((unused1, unused2) -> outcome,
- params.getExecutor());
-
- return future;
+ protected Executor getBlockingExecutor() {
+ return command -> {
+ Thread thread = new Thread(command);
+ thread.start();
};
}
+
+ @Override
+ protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) {
+
+ OperationOutcome outcome = makeSuccess();
+
+ /*
+ * Create an incomplete future that will timeout after the operation's
+ * timeout. If it fires before the other timer, then it will return a
+ * SUCCESS outcome.
+ */
+ CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
+ future = future.orTimeout(200, TimeUnit.MILLISECONDS).handleAsync((unused1, unused2) -> outcome,
+ params.getExecutor());
+
+ return future;
+ }
};
oper.configure(new TreeMap<>());
oper.start();
- ControlLoopOperation result = oper.startOperation(params).get();
- assertEquals(PolicyResult.SUCCESS.toString(), result.getOutcome());
+ OperationOutcome result = oper.startOperation(params).get();
+ assertEquals(PolicyResult.SUCCESS, result.getResult());
assertNotNull(opstart);
assertNotNull(opend);
- assertEquals(PolicyResult.SUCCESS.toString(), opend.getOutcome());
+ assertEquals(PolicyResult.SUCCESS, opend.getResult());
assertEquals(1, numStart);
assertEquals(1, oper.getCount());
@@ -510,8 +502,8 @@ public class OperatorPartialTest {
* Tests retry functions, when the count is set to zero and retries are exhausted.
*/
@Test
- public void testSetRetryFlag_testRetryOnFailure_ZeroRetries() {
- policy.setRetry(0);
+ public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
+ params = params.toBuilder().retry(0).build();
oper.setMaxFailures(10);
verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
@@ -522,7 +514,7 @@ public class OperatorPartialTest {
*/
@Test
public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
- policy.setRetry(null);
+ params = params.toBuilder().retry(null).build();
oper.setMaxFailures(10);
verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
@@ -534,10 +526,11 @@ public class OperatorPartialTest {
@Test
public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
final int maxRetries = 3;
- policy.setRetry(maxRetries);
+ params = params.toBuilder().retry(maxRetries).build();
oper.setMaxFailures(10);
- verifyRun("testVerifyRunningWhenNot", maxRetries + 1, maxRetries + 1, PolicyResult.FAILURE_RETRIES);
+ verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
+ PolicyResult.FAILURE_RETRIES);
}
/**
@@ -545,7 +538,7 @@ public class OperatorPartialTest {
*/
@Test
public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
- policy.setRetry(10);
+ params = params.toBuilder().retry(10).build();
final int maxFailures = 3;
oper.setMaxFailures(maxFailures);
@@ -563,8 +556,8 @@ public class OperatorPartialTest {
// arrange to return null from doOperation()
oper = new MyOper() {
@Override
- protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt,
- ControlLoopOperation operation) {
+ protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt,
+ OperationOutcome operation) {
// update counters
super.doOperation(params, attempt, operation);
@@ -579,96 +572,55 @@ public class OperatorPartialTest {
}
@Test
- public void testGetActorOutcome() {
- assertNull(oper.getActorOutcome(null));
+ public void testIsSameOperation() {
+ assertFalse(oper.isSameOperation(null));
- ControlLoopOperation outcome = params.makeOutcome();
- outcome.setOutcome(TARGET);
+ OperationOutcome outcome = params.makeOutcome();
- // wrong actor - should be null
+ // wrong actor - should be false
outcome.setActor(null);
- assertNull(oper.getActorOutcome(outcome));
+ assertFalse(oper.isSameOperation(outcome));
outcome.setActor(TARGET);
- assertNull(oper.getActorOutcome(outcome));
+ assertFalse(oper.isSameOperation(outcome));
outcome.setActor(ACTOR);
// wrong operation - should be null
outcome.setOperation(null);
- assertNull(oper.getActorOutcome(outcome));
+ assertFalse(oper.isSameOperation(outcome));
outcome.setOperation(TARGET);
- assertNull(oper.getActorOutcome(outcome));
+ assertFalse(oper.isSameOperation(outcome));
outcome.setOperation(OPERATOR);
- assertEquals(TARGET, oper.getActorOutcome(outcome));
- }
-
- @Test
- public void testOnSuccess() throws Exception {
- AtomicInteger count = new AtomicInteger();
-
- final Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> nextStep = oper -> {
- count.incrementAndGet();
- return CompletableFuture.completedFuture(oper);
- };
-
- // pass it a null outcome
- ControlLoopOperation outcome = oper.onSuccess(params, nextStep).apply(null).get();
- assertNotNull(outcome);
- assertEquals(PolicyResult.FAILURE.toString(), outcome.getOutcome());
- assertEquals(0, count.get());
-
- // pass it an unpopulated (i.e., failed) outcome
- outcome = new ControlLoopOperation();
- assertSame(outcome, oper.onSuccess(params, nextStep).apply(outcome).get());
- assertEquals(0, count.get());
-
- // pass it a successful outcome
- outcome = params.makeOutcome();
- outcome.setOutcome(PolicyResult.SUCCESS.toString());
- assertSame(outcome, oper.onSuccess(params, nextStep).apply(outcome).get());
- assertEquals(PolicyResult.SUCCESS.toString(), outcome.getOutcome());
- assertEquals(1, count.get());
+ assertTrue(oper.isSameOperation(outcome));
}
/**
- * Tests onSuccess() and handleFailure() when the outcome is a success.
+ * Tests handleFailure() when the outcome is a success.
*/
@Test
- public void testOnSuccessTrue_testHandleFailureTrue() {
- // arrange to return a success from the preprocessor
- oper.setPreProcessor(oper -> {
- oper.setOutcome(PolicyResult.SUCCESS.toString());
- return CompletableFuture.completedFuture(oper);
- });
-
- verifyRun("testOnSuccessTrue_testHandleFailureTrue", 1, 1, PolicyResult.SUCCESS);
+ public void testHandlePreprocessorFailureTrue() {
+ oper.setPreProcessor(CompletableFuture.completedFuture(makeSuccess()));
+ verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
}
/**
- * Tests onSuccess() and handleFailure() when the outcome is <i>not</i> a success.
+ * Tests handleFailure() when the outcome is <i>not</i> a success.
*/
@Test
- public void testOnSuccessFalse_testHandleFailureFalse() throws Exception {
- // arrange to return a failure from the preprocessor
- oper.setPreProcessor(oper -> {
- oper.setOutcome(PolicyResult.FAILURE.toString());
- return CompletableFuture.completedFuture(oper);
- });
-
- verifyRun("testOnSuccessFalse_testHandleFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
+ public void testHandlePreprocessorFailureFalse() throws Exception {
+ oper.setPreProcessor(CompletableFuture.completedFuture(makeFailure()));
+ verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
}
/**
- * Tests onSuccess() and handleFailure() when the outcome is {@code null}.
+ * Tests handleFailure() when the outcome is {@code null}.
*/
@Test
- public void testOnSuccessFalse_testHandleFailureNull() throws Exception {
+ public void testHandlePreprocessorFailureNull() throws Exception {
// arrange to return null from the preprocessor
- oper.setPreProcessor(oper -> {
- return CompletableFuture.completedFuture(null);
- });
+ oper.setPreProcessor(CompletableFuture.completedFuture(null));
- verifyRun("testOnSuccessFalse_testHandleFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
+ verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
}
@Test
@@ -688,11 +640,374 @@ public class OperatorPartialTest {
}
/**
- * Tests verifyRunning() when the pipeline is not running.
+ * Tests both flavors of anyOf(), because one invokes the other.
+ */
+ @Test
+ public void testAnyOf() throws Exception {
+ // first task completes, others do not
+ List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+
+ final OperationOutcome outcome = params.makeOutcome();
+
+ tasks.add(CompletableFuture.completedFuture(outcome));
+ tasks.add(new CompletableFuture<>());
+ tasks.add(new CompletableFuture<>());
+
+ CompletableFuture<OperationOutcome> result = oper.anyOf(params, tasks);
+ assertTrue(executor.runAll());
+
+ assertTrue(result.isDone());
+ assertSame(outcome, result.get());
+
+ // second task completes, others do not
+ tasks = new LinkedList<>();
+
+ tasks.add(new CompletableFuture<>());
+ tasks.add(CompletableFuture.completedFuture(outcome));
+ tasks.add(new CompletableFuture<>());
+
+ result = oper.anyOf(params, tasks);
+ assertTrue(executor.runAll());
+
+ assertTrue(result.isDone());
+ assertSame(outcome, result.get());
+
+ // third task completes, others do not
+ tasks = new LinkedList<>();
+
+ tasks.add(new CompletableFuture<>());
+ tasks.add(new CompletableFuture<>());
+ tasks.add(CompletableFuture.completedFuture(outcome));
+
+ result = oper.anyOf(params, tasks);
+ assertTrue(executor.runAll());
+
+ assertTrue(result.isDone());
+ assertSame(outcome, result.get());
+ }
+
+ /**
+ * Tests both flavors of allOf(), because one invokes the other.
+ */
+ @Test
+ public void testAllOf() throws Exception {
+ List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+
+ final OperationOutcome outcome = params.makeOutcome();
+
+ CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
+ CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
+ CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
+
+ tasks.add(future1);
+ tasks.add(future2);
+ tasks.add(future3);
+
+ CompletableFuture<OperationOutcome> result = oper.allOf(params, tasks);
+
+ assertTrue(executor.runAll());
+ assertFalse(result.isDone());
+ future1.complete(outcome);
+
+ // complete 3 before 2
+ assertTrue(executor.runAll());
+ assertFalse(result.isDone());
+ future3.complete(outcome);
+
+ assertTrue(executor.runAll());
+ assertFalse(result.isDone());
+ future2.complete(outcome);
+
+ // all of them are now done
+ assertTrue(executor.runAll());
+ assertTrue(result.isDone());
+ assertSame(outcome, result.get());
+ }
+
+ @Test
+ public void testCombineOutcomes() throws Exception {
+ // only one outcome
+ verifyOutcomes(0, PolicyResult.SUCCESS);
+ verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
+
+ // maximum is in different positions
+ verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
+ verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
+ verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
+
+ // null outcome
+ final List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+ tasks.add(CompletableFuture.completedFuture(null));
+ CompletableFuture<OperationOutcome> result = oper.allOf(params, tasks);
+
+ assertTrue(executor.runAll());
+ assertTrue(result.isDone());
+ assertNull(result.get());
+
+ // one throws an exception during execution
+ IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
+
+ tasks.clear();
+ tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
+ tasks.add(CompletableFuture.failedFuture(except));
+ tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
+ result = oper.allOf(params, tasks);
+
+ assertTrue(executor.runAll());
+ assertTrue(result.isCompletedExceptionally());
+ result.whenComplete((unused, thrown) -> assertSame(except, thrown));
+ }
+
+ private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
+ List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+
+
+ OperationOutcome expectedOutcome = null;
+
+ for (int count = 0; count < results.length; ++count) {
+ OperationOutcome outcome = params.makeOutcome();
+ outcome.setResult(results[count]);
+ tasks.add(CompletableFuture.completedFuture(outcome));
+
+ if (count == expected) {
+ expectedOutcome = outcome;
+ }
+ }
+
+ CompletableFuture<OperationOutcome> result = oper.allOf(params, tasks);
+
+ assertTrue(executor.runAll());
+ assertTrue(result.isDone());
+ assertSame(expectedOutcome, result.get());
+ }
+
+ private Function<OperationOutcome, CompletableFuture<OperationOutcome>> makeTask(
+ final OperationOutcome taskOutcome) {
+
+ return outcome -> CompletableFuture.completedFuture(taskOutcome);
+ }
+
+ @Test
+ public void testDetmPriority() {
+ assertEquals(1, oper.detmPriority(null));
+
+ OperationOutcome outcome = params.makeOutcome();
+
+ Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
+ PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
+ PolicyResult.FAILURE_EXCEPTION, 6);
+
+ for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
+ outcome.setResult(ent.getKey());
+ assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
+ }
+ }
+
+ /**
+ * Tests doTask(Future) when the controller is not running.
+ */
+ @Test
+ public void testDoTaskFutureNotRunning() throws Exception {
+ CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
+
+ PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+ controller.complete(params.makeOutcome());
+
+ CompletableFuture<OperationOutcome> future =
+ oper.doTask(params, controller, false, params.makeOutcome(), taskFuture);
+ assertFalse(future.isDone());
+ assertTrue(executor.runAll());
+
+ // should not have run the task
+ assertFalse(future.isDone());
+
+ // should have canceled the task future
+ assertTrue(taskFuture.isCancelled());
+ }
+
+ /**
+ * Tests doTask(Future) when the previous outcome was successful.
+ */
+ @Test
+ public void testDoTaskFutureSuccess() throws Exception {
+ CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
+ final OperationOutcome taskOutcome = params.makeOutcome();
+
+ PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+
+ CompletableFuture<OperationOutcome> future =
+ oper.doTask(params, controller, true, params.makeOutcome(), taskFuture);
+
+ taskFuture.complete(taskOutcome);
+ assertTrue(executor.runAll());
+
+ assertTrue(future.isDone());
+ assertSame(taskOutcome, future.get());
+
+ // controller should not be done yet
+ assertFalse(controller.isDone());
+ }
+
+ /**
+ * Tests doTask(Future) when the previous outcome was failed.
+ */
+ @Test
+ public void testDoTaskFutureFailure() throws Exception {
+ CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
+ final OperationOutcome failedOutcome = params.makeOutcome();
+ failedOutcome.setResult(PolicyResult.FAILURE);
+
+ PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+
+ CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, true, failedOutcome, taskFuture);
+ assertFalse(future.isDone());
+ assertTrue(executor.runAll());
+
+ // should not have run the task
+ assertFalse(future.isDone());
+
+ // should have canceled the task future
+ assertTrue(taskFuture.isCancelled());
+
+ // controller SHOULD be done now
+ assertTrue(controller.isDone());
+ assertSame(failedOutcome, controller.get());
+ }
+
+ /**
+ * Tests doTask(Future) when the previous outcome was failed, but not checking
+ * success.
+ */
+ @Test
+ public void testDoTaskFutureUncheckedFailure() throws Exception {
+ CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
+ final OperationOutcome failedOutcome = params.makeOutcome();
+ failedOutcome.setResult(PolicyResult.FAILURE);
+
+ PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+
+ CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, false, failedOutcome, taskFuture);
+ assertFalse(future.isDone());
+
+ // complete the task
+ OperationOutcome taskOutcome = params.makeOutcome();
+ taskFuture.complete(taskOutcome);
+
+ assertTrue(executor.runAll());
+
+ // should have run the task
+ assertTrue(future.isDone());
+
+ assertTrue(future.isDone());
+ assertSame(taskOutcome, future.get());
+
+ // controller should not be done yet
+ assertFalse(controller.isDone());
+ }
+
+ /**
+ * Tests doTask(Function) when the controller is not running.
+ */
+ @Test
+ public void testDoTaskFunctionNotRunning() throws Exception {
+ AtomicBoolean invoked = new AtomicBoolean();
+
+ Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
+ invoked.set(true);
+ return CompletableFuture.completedFuture(params.makeOutcome());
+ };
+
+ PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+ controller.complete(params.makeOutcome());
+
+ CompletableFuture<OperationOutcome> future =
+ oper.doTask(params, controller, false, task).apply(params.makeOutcome());
+ assertFalse(future.isDone());
+ assertTrue(executor.runAll());
+
+ // should not have run the task
+ assertFalse(future.isDone());
+
+ // should not have even invoked the task
+ assertFalse(invoked.get());
+ }
+
+ /**
+ * Tests doTask(Function) when the previous outcome was successful.
+ */
+ @Test
+ public void testDoTaskFunctionSuccess() throws Exception {
+ final OperationOutcome taskOutcome = params.makeOutcome();
+
+ final OperationOutcome failedOutcome = params.makeOutcome();
+
+ Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
+
+ PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+
+ CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, true, task).apply(failedOutcome);
+
+ assertTrue(future.isDone());
+ assertSame(taskOutcome, future.get());
+
+ // controller should not be done yet
+ assertFalse(controller.isDone());
+ }
+
+ /**
+ * Tests doTask(Function) when the previous outcome was failed.
+ */
+ @Test
+ public void testDoTaskFunctionFailure() throws Exception {
+ final OperationOutcome failedOutcome = params.makeOutcome();
+ failedOutcome.setResult(PolicyResult.FAILURE);
+
+ AtomicBoolean invoked = new AtomicBoolean();
+
+ Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
+ invoked.set(true);
+ return CompletableFuture.completedFuture(params.makeOutcome());
+ };
+
+ PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+
+ CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, true, task).apply(failedOutcome);
+ assertFalse(future.isDone());
+ assertTrue(executor.runAll());
+
+ // should not have run the task
+ assertFalse(future.isDone());
+
+ // should not have even invoked the task
+ assertFalse(invoked.get());
+
+ // controller should have the failed task
+ assertTrue(controller.isDone());
+ assertSame(failedOutcome, controller.get());
+ }
+
+ /**
+ * Tests doTask(Function) when the previous outcome was failed, but not checking
+ * success.
*/
@Test
- public void testVerifyRunningWhenNot() {
- verifyRun("testVerifyRunningWhenNot", 0, 0, PolicyResult.SUCCESS, future -> future.cancel(false));
+ public void testDoTaskFunctionUncheckedFailure() throws Exception {
+ final OperationOutcome taskOutcome = params.makeOutcome();
+
+ final OperationOutcome failedOutcome = params.makeOutcome();
+ failedOutcome.setResult(PolicyResult.FAILURE);
+
+ Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
+
+ PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+
+ CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, false, task).apply(failedOutcome);
+
+ assertTrue(future.isDone());
+ assertSame(taskOutcome, future.get());
+
+ // controller should not be done yet
+ assertFalse(controller.isDone());
}
/**
@@ -700,7 +1015,7 @@ public class OperatorPartialTest {
*/
@Test
public void testCallbackStartedNotRunning() {
- AtomicReference<Future<ControlLoopOperation>> future = new AtomicReference<>();
+ AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
/*
* arrange to stop the controller when the start-callback is invoked, but capture
@@ -723,7 +1038,7 @@ public class OperatorPartialTest {
*/
@Test
public void testCallbackCompletedNotRunning() {
- AtomicReference<Future<ControlLoopOperation>> future = new AtomicReference<>();
+ AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
// arrange to stop the controller when the start-callback is invoked
params = params.toBuilder().startCallback(oper -> {
@@ -739,36 +1054,36 @@ public class OperatorPartialTest {
}
@Test
- public void testSetOutcomeControlLoopOperationThrowable() {
+ public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
- ControlLoopOperation outcome;
+ OperationOutcome outcome;
- outcome = new ControlLoopOperation();
+ outcome = new OperationOutcome();
oper.setOutcome(params, outcome, timex);
assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
- assertEquals(PolicyResult.FAILURE_TIMEOUT.toString(), outcome.getOutcome());
+ assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
- outcome = new ControlLoopOperation();
- oper.setOutcome(params, outcome, new IllegalStateException());
+ outcome = new OperationOutcome();
+ oper.setOutcome(params, outcome, new IllegalStateException(EXPECTED_EXCEPTION));
assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
- assertEquals(PolicyResult.FAILURE_EXCEPTION.toString(), outcome.getOutcome());
+ assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
}
@Test
- public void testSetOutcomeControlLoopOperationPolicyResult() {
- ControlLoopOperation outcome;
+ public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
+ OperationOutcome outcome;
- outcome = new ControlLoopOperation();
+ outcome = new OperationOutcome();
oper.setOutcome(params, outcome, PolicyResult.SUCCESS);
assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
- assertEquals(PolicyResult.SUCCESS.toString(), outcome.getOutcome());
+ assertEquals(PolicyResult.SUCCESS, outcome.getResult());
for (PolicyResult result : FAILURE_RESULTS) {
- outcome = new ControlLoopOperation();
+ outcome = new OperationOutcome();
oper.setOutcome(params, outcome, result);
assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
- assertEquals(result.toString(), result.toString(), outcome.getOutcome());
+ assertEquals(result.toString(), result, outcome.getResult());
}
}
@@ -776,7 +1091,7 @@ public class OperatorPartialTest {
public void testIsTimeout() {
final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
- assertFalse(oper.isTimeout(new IllegalStateException()));
+ assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
assertFalse(oper.isTimeout(new IllegalStateException(timex)));
assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
assertFalse(oper.isTimeout(new CompletionException(null)));
@@ -788,19 +1103,19 @@ public class OperatorPartialTest {
@Test
public void testGetTimeOutMillis() {
- assertEquals(TIMEOUT * 1000, oper.getTimeOutMillis(policy));
+ assertEquals(TIMEOUT * 1000, oper.getTimeOutMillis(params.getTimeoutSec()));
- policy.setTimeout(null);
- assertEquals(0, oper.getTimeOutMillis(policy));
+ params = params.toBuilder().timeoutSec(null).build();
+ assertEquals(0, oper.getTimeOutMillis(params.getTimeoutSec()));
}
- private void starter(ControlLoopOperation oper) {
+ private void starter(OperationOutcome oper) {
++numStart;
tstart = oper.getStart();
opstart = oper;
}
- private void completer(ControlLoopOperation oper) {
+ private void completer(OperationOutcome oper) {
++numEnd;
opend = oper;
}
@@ -816,21 +1131,18 @@ public class OperatorPartialTest {
};
}
- /**
- * Verifies a run.
- *
- * @param testName test name
- * @param expectedCallbacks number of callbacks expected
- * @param expectedOperations number of operation invocations expected
- * @param expectedResult expected outcome
- */
- private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
- PolicyResult expectedResult) {
+ private OperationOutcome makeSuccess() {
+ OperationOutcome outcome = params.makeOutcome();
+ outcome.setResult(PolicyResult.SUCCESS);
- String expectedSubRequestId =
- (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
+ return outcome;
+ }
- verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
+ private OperationOutcome makeFailure() {
+ OperationOutcome outcome = params.makeOutcome();
+ outcome.setResult(PolicyResult.FAILURE);
+
+ return outcome;
}
/**
@@ -840,17 +1152,14 @@ public class OperatorPartialTest {
* @param expectedCallbacks number of callbacks expected
* @param expectedOperations number of operation invocations expected
* @param expectedResult expected outcome
- * @param manipulator function to modify the future returned by
- * {@link OperatorPartial#startOperation(ControlLoopOperationParams)} before
- * the tasks in the executor are run
*/
- private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
- Consumer<CompletableFuture<ControlLoopOperation>> manipulator) {
+ private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
+ PolicyResult expectedResult) {
String expectedSubRequestId =
(expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
- verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, manipulator);
+ verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
}
/**
@@ -866,9 +1175,9 @@ public class OperatorPartialTest {
* the tasks in the executor are run
*/
private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
- String expectedSubRequestId, Consumer<CompletableFuture<ControlLoopOperation>> manipulator) {
+ String expectedSubRequestId, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
- CompletableFuture<ControlLoopOperation> future = oper.startOperation(params);
+ CompletableFuture<OperationOutcome> future = oper.startOperation(params);
manipulator.accept(future);
@@ -880,7 +1189,7 @@ public class OperatorPartialTest {
if (expectedCallbacks > 0) {
assertNotNull(testName, opstart);
assertNotNull(testName, opend);
- assertEquals(testName, expectedResult.toString(), opend.getOutcome());
+ assertEquals(testName, expectedResult, opend.getResult());
assertSame(testName, tstart, opstart.getStart());
assertSame(testName, tstart, opend.getStart());
@@ -901,7 +1210,7 @@ public class OperatorPartialTest {
assertEquals(testName, expectedOperations, oper.getCount());
}
- private static class MyOper extends OperatorPartial {
+ private class MyOper extends OperatorPartial {
@Getter
private int count = 0;
@@ -912,15 +1221,15 @@ public class OperatorPartialTest {
private int maxFailures = 0;
@Setter
- private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preProcessor;
+ private CompletableFuture<OperationOutcome> preProcessor;
public MyOper() {
super(ACTOR, OPERATOR);
}
@Override
- protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt,
- ControlLoopOperation operation) {
+ protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt,
+ OperationOutcome operation) {
++count;
if (genException) {
throw new IllegalStateException(EXPECTED_EXCEPTION);
@@ -929,19 +1238,22 @@ public class OperatorPartialTest {
operation.setSubRequestId(String.valueOf(attempt));
if (count > maxFailures) {
- operation.setOutcome(PolicyResult.SUCCESS.toString());
+ operation.setResult(PolicyResult.SUCCESS);
} else {
- operation.setOutcome(PolicyResult.FAILURE.toString());
+ operation.setResult(PolicyResult.FAILURE);
}
return operation;
}
@Override
- protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture(
- ControlLoopOperationParams params) {
+ protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) {
+ return (preProcessor != null ? preProcessor : super.startPreprocessorAsync(params));
+ }
- return (preProcessor != null ? preProcessor : super.doPreprocessorAsFuture(params));
+ @Override
+ protected Executor getBlockingExecutor() {
+ return executor;
}
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParamsTest.java
index 0c8e77d38..9dd19d548 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParamsTest.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParamsTest.java
@@ -35,6 +35,8 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.Map;
+import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -47,20 +49,23 @@ import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.onap.policy.common.parameters.BeanValidationResult;
-import org.onap.policy.controlloop.ControlLoopOperation;
import org.onap.policy.controlloop.VirtualControlLoopEvent;
import org.onap.policy.controlloop.actorserviceprovider.ActorService;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
import org.onap.policy.controlloop.actorserviceprovider.Operator;
import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams.ControlLoopOperationParamsBuilder;
import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
-import org.onap.policy.controlloop.policy.Policy;
+import org.onap.policy.controlloop.policy.Target;
public class ControlLoopOperationParamsTest {
private static final String EXPECTED_EXCEPTION = "expected exception";
private static final String ACTOR = "my-actor";
private static final String OPERATION = "my-operation";
- private static final String TARGET = "my-target";
+ private static final Target TARGET = new Target();
+ private static final String TARGET_ENTITY = "my-target";
+ private static final Integer RETRY = 3;
+ private static final Integer TIMEOUT = 100;
private static final UUID REQ_ID = UUID.randomUUID();
@Mock
@@ -70,7 +75,7 @@ public class ControlLoopOperationParamsTest {
private ActorService actorService;
@Mock
- private Consumer<ControlLoopOperation> completer;
+ private Consumer<OperationOutcome> completer;
@Mock
private ControlLoopEventContext context;
@@ -82,19 +87,18 @@ public class ControlLoopOperationParamsTest {
private Executor executor;
@Mock
- private CompletableFuture<ControlLoopOperation> operation;
+ private CompletableFuture<OperationOutcome> operation;
@Mock
private Operator operator;
@Mock
- private Policy policy;
+ private Consumer<OperationOutcome> starter;
- @Mock
- private Consumer<ControlLoopOperation> starter;
+ private Map<String, String> payload;
private ControlLoopOperationParams params;
- private ControlLoopOperation outcome;
+ private OperationOutcome outcome;
/**
@@ -112,12 +116,12 @@ public class ControlLoopOperationParamsTest {
when(context.getEvent()).thenReturn(event);
- when(policy.getActor()).thenReturn(ACTOR);
- when(policy.getRecipe()).thenReturn(OPERATION);
+ payload = new TreeMap<>();
params = ControlLoopOperationParams.builder().actorService(actorService).completeCallback(completer)
- .context(context).executor(executor).policy(policy).startCallback(starter).target(TARGET)
- .build();
+ .context(context).executor(executor).actor(ACTOR).operation(OPERATION).payload(payload)
+ .retry(RETRY).target(TARGET).targetEntity(TARGET_ENTITY).timeoutSec(TIMEOUT)
+ .startCallback(starter).build();
outcome = params.makeOutcome();
}
@@ -130,30 +134,6 @@ public class ControlLoopOperationParamsTest {
}
@Test
- public void testGetActor() {
- assertEquals(ACTOR, params.getActor());
-
- // try with null policy
- assertEquals(ControlLoopOperationParams.UNKNOWN, params.toBuilder().policy(null).build().getActor());
-
- // try with null name in the policy
- when(policy.getActor()).thenReturn(null);
- assertEquals(ControlLoopOperationParams.UNKNOWN, params.getActor());
- }
-
- @Test
- public void testGetOperation() {
- assertEquals(OPERATION, params.getOperation());
-
- // try with null policy
- assertEquals(ControlLoopOperationParams.UNKNOWN, params.toBuilder().policy(null).build().getOperation());
-
- // try with null name in the policy
- when(policy.getRecipe()).thenReturn(null);
- assertEquals(ControlLoopOperationParams.UNKNOWN, params.getOperation());
- }
-
- @Test
public void testGetRequestId() {
assertSame(REQ_ID, params.getRequestId());
@@ -170,20 +150,14 @@ public class ControlLoopOperationParamsTest {
assertEquals(ACTOR, outcome.getActor());
assertEquals(OPERATION, outcome.getOperation());
checkRemainingFields("with actor");
-
- // try again with a null policy
- outcome = params.toBuilder().policy(null).build().makeOutcome();
- assertEquals(ControlLoopOperationParams.UNKNOWN, outcome.getActor());
- assertEquals(ControlLoopOperationParams.UNKNOWN, outcome.getOperation());
- checkRemainingFields("unknown actor");
}
protected void checkRemainingFields(String testName) {
- assertEquals(testName, TARGET, outcome.getTarget());
- assertNotNull(testName, outcome.getStart());
+ assertEquals(testName, TARGET_ENTITY, outcome.getTarget());
+ assertNull(testName, outcome.getStart());
assertNull(testName, outcome.getEnd());
assertNull(testName, outcome.getSubRequestId());
- assertNull(testName, outcome.getOutcome());
+ assertNotNull(testName, outcome.getResult());
assertNull(testName, outcome.getMessage());
}
@@ -239,21 +213,23 @@ public class ControlLoopOperationParamsTest {
@Test
public void testValidateFields() {
+ testValidate("actor", "null", bldr -> bldr.actor(null));
testValidate("actorService", "null", bldr -> bldr.actorService(null));
testValidate("context", "null", bldr -> bldr.context(null));
testValidate("executor", "null", bldr -> bldr.executor(null));
- testValidate("policy", "null", bldr -> bldr.policy(null));
- testValidate("target", "null", bldr -> bldr.target(null));
+ testValidate("operation", "null", bldr -> bldr.operation(null));
+ testValidate("target", "null", bldr -> bldr.targetEntity(null));
// check edge cases
assertTrue(params.toBuilder().build().validate().isValid());
// these can be null
- assertTrue(params.toBuilder().startCallback(null).completeCallback(null).build().validate().isValid());
+ assertTrue(params.toBuilder().payload(null).retry(null).target(null).timeoutSec(null).startCallback(null)
+ .completeCallback(null).build().validate().isValid());
// test with minimal fields
- assertTrue(ControlLoopOperationParams.builder().actorService(actorService).context(context).policy(policy)
- .target(TARGET).build().validate().isValid());
+ assertTrue(ControlLoopOperationParams.builder().actorService(actorService).context(context).actor(ACTOR)
+ .operation(OPERATION).targetEntity(TARGET_ENTITY).build().validate().isValid());
}
private void testValidate(String fieldName, String expected,
@@ -275,7 +251,12 @@ public class ControlLoopOperationParamsTest {
}
@Test
- public void testActorService() {
+ public void testGetActor() {
+ assertSame(ACTOR, params.getActor());
+ }
+
+ @Test
+ public void testGetActorService() {
assertSame(actorService, params.getActorService());
}
@@ -293,8 +274,43 @@ public class ControlLoopOperationParamsTest {
}
@Test
- public void testGetPolicy() {
- assertSame(policy, params.getPolicy());
+ public void testGetOperation() {
+ assertSame(OPERATION, params.getOperation());
+ }
+
+ @Test
+ public void testGetPayload() {
+ assertSame(payload, params.getPayload());
+
+ // should be null when unspecified
+ assertNull(ControlLoopOperationParams.builder().build().getPayload());
+ }
+
+ @Test
+ public void testGetRetry() {
+ assertSame(RETRY, params.getRetry());
+
+ // should be null when unspecified
+ assertNull(ControlLoopOperationParams.builder().build().getRetry());
+ }
+
+ @Test
+ public void testTarget() {
+ assertSame(TARGET, params.getTarget());
+
+ // should be null when unspecified
+ assertNull(ControlLoopOperationParams.builder().build().getTarget());
+ }
+
+ @Test
+ public void testGetTimeoutSec() {
+ assertSame(TIMEOUT, params.getTimeoutSec());
+
+ // should be 300 when unspecified
+ assertEquals(Integer.valueOf(300), ControlLoopOperationParams.builder().build().getTimeoutSec());
+
+ // null should be ok too
+ assertNull(ControlLoopOperationParams.builder().timeoutSec(null).build().getTimeoutSec());
}
@Test
@@ -308,7 +324,7 @@ public class ControlLoopOperationParamsTest {
}
@Test
- public void testGetTarget() {
- assertEquals(TARGET, params.getTarget());
+ public void testGetTargetEntity() {
+ assertEquals(TARGET_ENTITY, params.getTargetEntity());
}
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParamsTest.java
index 1763388f2..6c1f538ec 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParamsTest.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParamsTest.java
@@ -90,6 +90,8 @@ public class HttpActorParamsTest {
@Test
public void testValidate() {
+ assertTrue(params.validate(CONTAINER).isValid());
+
testValidateField("clientName", "null", params2 -> params2.setClientName(null));
testValidateField("path", "null", params2 -> params2.setPath(null));
testValidateField("timeoutSec", "minimum", params2 -> params2.setTimeoutSec(-1));
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParamsTest.java
index 829c480d1..6cf7328ca 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParamsTest.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParamsTest.java
@@ -47,6 +47,8 @@ public class HttpParamsTest {
@Test
public void testValidate() {
+ assertTrue(params.validate(CONTAINER).isValid());
+
testValidateField("clientName", "null", bldr -> bldr.clientName(null));
testValidateField("path", "null", bldr -> bldr.path(null));
testValidateField("timeoutSec", "minimum", bldr -> bldr.timeoutSec(-1));
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFutureTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFutureTest.java
index b421c1ce2..a6b11ef65 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFutureTest.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFutureTest.java
@@ -23,21 +23,27 @@ package org.onap.policy.controlloop.actorserviceprovider.pipeline;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@@ -58,9 +64,10 @@ public class PipelineControllerFutureTest {
private Future<String> future2;
@Mock
- private CompletableFuture<String> compFuture;
+ private Executor executor;
+ private CompletableFuture<String> compFuture;
private PipelineControllerFuture<String> controller;
@@ -72,6 +79,8 @@ public class PipelineControllerFutureTest {
public void setUp() {
MockitoAnnotations.initMocks(this);
+ compFuture = spy(new CompletableFuture<>());
+
controller = new PipelineControllerFuture<>();
controller.add(runnable1);
@@ -89,10 +98,7 @@ public class PipelineControllerFutureTest {
assertTrue(controller.isCancelled());
assertFalse(controller.isRunning());
- verify(runnable1).run();
- verify(runnable2).run();
- verify(future1).cancel(anyBoolean());
- verify(future2).cancel(anyBoolean());
+ verifyStopped();
// re-invoke; nothing should change
assertTrue(controller.cancel(true));
@@ -100,10 +106,155 @@ public class PipelineControllerFutureTest {
assertTrue(controller.isCancelled());
assertFalse(controller.isRunning());
- verify(runnable1).run();
- verify(runnable2).run();
- verify(future1).cancel(anyBoolean());
- verify(future2).cancel(anyBoolean());
+ verifyStopped();
+ }
+
+ @Test
+ public void testCompleteT() throws Exception {
+ assertTrue(controller.complete(TEXT));
+ assertEquals(TEXT, controller.get());
+
+ verifyStopped();
+
+ // repeat - disallowed
+ assertFalse(controller.complete(TEXT));
+ }
+
+ @Test
+ public void testCompleteExceptionallyThrowable() {
+ assertTrue(controller.completeExceptionally(EXPECTED_EXCEPTION));
+ assertThatThrownBy(() -> controller.get()).hasCause(EXPECTED_EXCEPTION);
+
+ verifyStopped();
+
+ // repeat - disallowed
+ assertFalse(controller.completeExceptionally(EXPECTED_EXCEPTION));
+ }
+
+ @Test
+ public void testCompleteAsyncSupplierOfQextendsTExecutor() throws Exception {
+ CompletableFuture<String> future = controller.completeAsync(() -> TEXT, executor);
+
+ // haven't stopped anything yet
+ assertFalse(future.isDone());
+ verify(runnable1, never()).run();
+
+ // get the operation and run it
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(executor).execute(captor.capture());
+ captor.getValue().run();
+
+ // should be done now
+ assertTrue(future.isDone());
+
+ assertEquals(TEXT, future.get());
+
+ verifyStopped();
+ }
+
+ /**
+ * Tests completeAsync(executor) when canceled before execution.
+ */
+ @Test
+ public void testCompleteAsyncSupplierOfQextendsTExecutorCanceled() throws Exception {
+ CompletableFuture<String> future = controller.completeAsync(() -> TEXT, executor);
+
+ assertTrue(future.cancel(false));
+
+ verifyStopped();
+
+ assertTrue(future.isDone());
+
+ assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
+ }
+
+ @Test
+ public void testCompleteAsyncSupplierOfQextendsT() throws Exception {
+ CompletableFuture<String> future = controller.completeAsync(() -> TEXT);
+ assertEquals(TEXT, future.get());
+
+ verifyStopped();
+ }
+
+ /**
+ * Tests completeAsync() when canceled.
+ */
+ @Test
+ public void testCompleteAsyncSupplierOfQextendsTCanceled() throws Exception {
+ CountDownLatch canceled = new CountDownLatch(1);
+
+ // run async, but await until canceled
+ CompletableFuture<String> future = controller.completeAsync(() -> {
+ try {
+ canceled.await();
+ } catch (InterruptedException e) {
+ // do nothing
+ }
+
+ return TEXT;
+ });
+
+ assertTrue(future.cancel(false));
+
+ // let the future run now
+ canceled.countDown();
+
+ verifyStopped();
+
+ assertTrue(future.isDone());
+
+ assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
+ }
+
+ @Test
+ public void testCompleteOnTimeoutTLongTimeUnit() throws Exception {
+ CountDownLatch stopped = new CountDownLatch(1);
+ controller.add(() -> stopped.countDown());
+
+ CompletableFuture<String> future = controller.completeOnTimeout(TEXT, 1, TimeUnit.MILLISECONDS);
+
+ assertEquals(TEXT, future.get());
+
+ /*
+ * Must use latch instead of verifyStopped(), because the runnables may be
+ * executed asynchronously.
+ */
+ assertTrue(stopped.await(5, TimeUnit.SECONDS));
+ }
+
+ /**
+ * Tests completeOnTimeout() when completed before the timeout.
+ */
+ @Test
+ public void testCompleteOnTimeoutTLongTimeUnitNoTimeout() throws Exception {
+ CompletableFuture<String> future = controller.completeOnTimeout("timed out", 5, TimeUnit.SECONDS);
+ controller.complete(TEXT);
+
+ assertEquals(TEXT, future.get());
+
+ verifyStopped();
+ }
+
+ /**
+ * Tests completeOnTimeout() when canceled before the timeout.
+ */
+ @Test
+ public void testCompleteOnTimeoutTLongTimeUnitCanceled() {
+ CompletableFuture<String> future = controller.completeOnTimeout(TEXT, 5, TimeUnit.SECONDS);
+ assertTrue(future.cancel(true));
+
+ assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
+
+ verifyStopped();
+ }
+
+ @Test
+ public void testNewIncompleteFuture() {
+ PipelineControllerFuture<String> future = controller.newIncompleteFuture();
+ assertNotNull(future);
+ assertTrue(future instanceof PipelineControllerFuture);
+ assertNotSame(controller, future);
+ assertFalse(future.isDone());
}
@Test
@@ -208,22 +359,81 @@ public class PipelineControllerFutureTest {
verify(future2).cancel(anyBoolean());
}
+ /**
+ * Tests both wrap() methods.
+ */
@Test
- public void testAddFunction() {
- AtomicReference<String> value = new AtomicReference<>();
+ public void testWrap() throws Exception {
+ controller = spy(controller);
- Function<String, CompletableFuture<String>> func = controller.add(input -> {
- value.set(input);
+ CompletableFuture<String> future = controller.wrap(compFuture);
+ verify(controller, never()).remove(compFuture);
+
+ compFuture.complete(TEXT);
+ assertEquals(TEXT, future.get());
+
+ verify(controller).remove(compFuture);
+ }
+
+ /**
+ * Tests wrap(), when the controller is not running.
+ */
+ @Test
+ public void testWrapNotRunning() throws Exception {
+ controller.cancel(false);
+ controller = spy(controller);
+
+ assertFalse(controller.wrap(compFuture).isDone());
+ verify(controller, never()).add(compFuture);
+ verify(controller, never()).remove(compFuture);
+
+ verify(compFuture).cancel(anyBoolean());
+ }
+
+ /**
+ * Tests wrap(), when the future throws an exception.
+ */
+ @Test
+ public void testWrapException() throws Exception {
+ controller = spy(controller);
+
+ CompletableFuture<String> future = controller.wrap(compFuture);
+ verify(controller, never()).remove(compFuture);
+
+ compFuture.completeExceptionally(EXPECTED_EXCEPTION);
+ assertThatThrownBy(() -> future.get()).hasCause(EXPECTED_EXCEPTION);
+
+ verify(controller).remove(compFuture);
+ }
+
+ @Test
+ public void testWrapFunction() throws Exception {
+
+ Function<String, CompletableFuture<String>> func = controller.wrap(input -> {
+ compFuture.complete(input);
return compFuture;
});
- assertSame(compFuture, func.apply(TEXT));
- assertEquals(TEXT, value.get());
+ CompletableFuture<String> future = func.apply(TEXT);
+ assertTrue(compFuture.isDone());
- verify(compFuture, never()).cancel(anyBoolean());
+ assertEquals(TEXT, future.get());
// should not have completed the controller
assertFalse(controller.isDone());
+ }
+
+ /**
+ * Tests add(Function) when the controller is canceled after the future is added.
+ */
+ @Test
+ public void testWrapFunctionCancel() throws Exception {
+ Function<String, CompletableFuture<String>> func = controller.wrap(input -> compFuture);
+
+ CompletableFuture<String> future = func.apply(TEXT);
+ assertFalse(future.isDone());
+
+ assertFalse(compFuture.isDone());
// cancel - should propagate
controller.cancel(false);
@@ -235,10 +445,10 @@ public class PipelineControllerFutureTest {
* Tests add(Function) when the controller is not running.
*/
@Test
- public void testAddFunctionNotRunning() {
+ public void testWrapFunctionNotRunning() {
AtomicReference<String> value = new AtomicReference<>();
- Function<String, CompletableFuture<String>> func = controller.add(input -> {
+ Function<String, CompletableFuture<String>> func = controller.wrap(input -> {
value.set(input);
return compFuture;
});
@@ -251,4 +461,11 @@ public class PipelineControllerFutureTest {
assertNull(value.get());
}
+
+ private void verifyStopped() {
+ verify(runnable1).run();
+ verify(runnable2).run();
+ verify(future1).cancel(anyBoolean());
+ verify(future2).cancel(anyBoolean());
+ }
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml b/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml
index f8a1e5112..c7fe46e47 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml
+++ b/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml
@@ -35,8 +35,8 @@
<appender-ref ref="STDOUT" />
</root>
- <!-- this is just an example -->
- <logger name="ch.qos.logback.classic" level="off" additivity="false">
+ <!-- this is required for UtilTest -->
+ <logger name="org.onap.policy.controlloop.actorserviceprovider.Util" level="info" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
</configuration>