summaryrefslogtreecommitdiffstats
path: root/models-interactions/model-actors/actorServiceProvider/src/main
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2020-02-06 21:48:12 -0500
committerJim Hahn <jrh3@att.com>2020-02-07 13:21:52 -0500
commite06578535f6afadac715c04ed03c74c05a075780 (patch)
tree1c9a0141daf15b93cb4f6452703d92cf8a6d751c /models-interactions/model-actors/actorServiceProvider/src/main
parentaccad88260f99c1b5c5329285b73aa84349e623b (diff)
Clean up and enhancement of Actor re-design
Added junits for the remaining code. Enhancements to facilitate implementation of Operators: - Added allOf(), anyOf() facilities - Added AsyncResponseHandler for handling asynchronous I/O via the HttpClient - Added logRestRequest() and logRestResponse() for logging REST requests and responses - Added HttpActor and HttpOperator, which can be used as superclasses - Added doTask() - Lifted data from the event into ControlLoopEventContext Updates per previous review comments: - Changed logException() to runFunction(). - Removed the aaiCqResponse field. - Lifted fields from Policy into ControlLoopOperationParams, eliminating the need to include Policy in the class. OperatorPartial depends on the string values in the ControlLoopOperation being set to one of the string values of PolicyResult. Instead of passing ControlLoopOperation around, the operators should pass around an object that uses PolicyResult directly, rather than depending on the string values being set correctly. Created OperationOutcome for this purpose. Stop pipeline when the controller completes. Use whenComplete() where appropriate. startOperationAsync() should not block. Modified it to launch the task in the background via its own thread. Extracted CallbackManager into its own file. Replaced actor setOperators() with addOperator() Renamed add() to wrap(), and modified it to remove the future when it completes. Fixed the signature on delayedRemove() and delayedComplete(). Replaced xxxAsync() calls with just xxx() calls, where appropriate to avoid the extra overhead of submitting it to a work queue. Renamed handleFailure() to handlePreprocessorFailure(). Updates per WIP review comments Issue-ID: POLICY-1625 Signed-off-by: Jim Hahn <jrh3@att.com> Change-Id: Id4c4c7ade979bdb76cc54266837609cc69a22c58
Diffstat (limited to 'models-interactions/model-actors/actorServiceProvider/src/main')
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java7
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java119
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManager.java84
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcome.java116
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java3
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java81
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java23
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java50
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActor.java58
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java84
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java640
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java108
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java2
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java119
14 files changed, 1100 insertions, 394 deletions
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java
index 13f09b1ad..2886b1feb 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java
@@ -63,7 +63,6 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> {
return newActor;
}
- // TODO: should this throw an exception?
logger.warn("duplicate actor names for {}: {}, ignoring {}", name,
existingActor.getClass().getSimpleName(), newActor.getClass().getSimpleName());
return existingActor;
@@ -158,7 +157,7 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> {
for (Actor actor : name2actor.values()) {
if (actor.isConfigured()) {
- Util.logException(actor::start, "failed to start actor {}", actor.getName());
+ Util.runFunction(actor::start, "failed to start actor {}", actor.getName());
} else {
logger.warn("not starting unconfigured actor {}", actor.getName());
@@ -170,7 +169,7 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> {
protected void doStop() {
logger.info("stopping actors");
name2actor.values()
- .forEach(actor -> Util.logException(actor::stop, "failed to stop actor {}", actor.getName()));
+ .forEach(actor -> Util.runFunction(actor::stop, "failed to stop actor {}", actor.getName()));
}
@Override
@@ -179,7 +178,7 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> {
// @formatter:off
name2actor.values().forEach(
- actor -> Util.logException(actor::shutdown, "failed to shutdown actor {}", actor.getName()));
+ actor -> Util.runFunction(actor::shutdown, "failed to shutdown actor {}", actor.getName()));
// @formatter:on
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java
new file mode 100644
index 000000000..d78403809
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java
@@ -0,0 +1,119 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import javax.ws.rs.client.InvocationCallback;
+import lombok.AccessLevel;
+import lombok.Getter;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handler for a <i>single</i> asynchronous response.
+ *
+ * @param <T> response type
+ */
+@Getter
+public abstract class AsyncResponseHandler<T> implements InvocationCallback<T> {
+
+ private static final Logger logger = LoggerFactory.getLogger(AsyncResponseHandler.class);
+
+ @Getter(AccessLevel.NONE)
+ private final PipelineControllerFuture<OperationOutcome> result = new PipelineControllerFuture<>();
+ private final ControlLoopOperationParams params;
+ private final OperationOutcome outcome;
+
+ /**
+ * Constructs the object.
+ *
+ * @param params operation parameters
+ * @param outcome outcome to be populated based on the response
+ */
+ public AsyncResponseHandler(ControlLoopOperationParams params, OperationOutcome outcome) {
+ this.params = params;
+ this.outcome = outcome;
+ }
+
+ /**
+ * Handles the given future, arranging to cancel it when the response is received.
+ *
+ * @param future future to be handled
+ * @return a future to be used to cancel or wait for the response
+ */
+ public CompletableFuture<OperationOutcome> handle(Future<T> future) {
+ result.add(future);
+ return result;
+ }
+
+ /**
+ * Invokes {@link #doComplete()} and then completes "this" with the returned value.
+ */
+ @Override
+ public void completed(T rawResponse) {
+ try {
+ logger.trace("{}.{}: response completed for {}", params.getActor(), params.getOperation(),
+ params.getRequestId());
+ result.complete(doComplete(rawResponse));
+
+ } catch (RuntimeException e) {
+ logger.trace("{}.{}: response handler threw an exception for {}", params.getActor(), params.getOperation(),
+ params.getRequestId());
+ result.completeExceptionally(e);
+ }
+ }
+
+ /**
+ * Invokes {@link #doFailed()} and then completes "this" with the returned value.
+ */
+ @Override
+ public void failed(Throwable throwable) {
+ try {
+ logger.trace("{}.{}: response failure for {}", params.getActor(), params.getOperation(),
+ params.getRequestId());
+ result.complete(doFailed(throwable));
+
+ } catch (RuntimeException e) {
+ logger.trace("{}.{}: response failure handler threw an exception for {}", params.getActor(),
+ params.getOperation(), params.getRequestId());
+ result.completeExceptionally(e);
+ }
+ }
+
+ /**
+ * Completes the processing of a response.
+ *
+ * @param rawResponse raw response that was received
+ * @return the outcome
+ */
+ protected abstract OperationOutcome doComplete(T rawResponse);
+
+ /**
+ * Handles a response exception.
+ *
+ * @param thrown exception that was thrown
+ * @return the outcome
+ */
+ protected abstract OperationOutcome doFailed(Throwable thrown);
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManager.java
new file mode 100644
index 000000000..7d7c1d902
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/CallbackManager.java
@@ -0,0 +1,84 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider;
+
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Manager for "start" and "end" callbacks.
+ */
+public class CallbackManager implements Runnable {
+ private final AtomicReference<Instant> startTime = new AtomicReference<>();
+ private final AtomicReference<Instant> endTime = new AtomicReference<>();
+
+ /**
+ * Determines if the "start" callback can be invoked. If so, it sets the
+ * {@link #startTime} to the current time.
+ *
+ * @return {@code true} if the "start" callback can be invoked, {@code false}
+ * otherwise
+ */
+ public boolean canStart() {
+ return startTime.compareAndSet(null, Instant.now());
+ }
+
+ /**
+ * Determines if the "end" callback can be invoked. If so, it sets the
+ * {@link #endTime} to the current time.
+ *
+ * @return {@code true} if the "end" callback can be invoked, {@code false}
+ * otherwise
+ */
+ public boolean canEnd() {
+ return endTime.compareAndSet(null, Instant.now());
+ }
+
+ /**
+ * Gets the start time.
+ *
+ * @return the start time, or {@code null} if {@link #canStart()} has not been
+ * invoked yet.
+ */
+ public Instant getStartTime() {
+ return startTime.get();
+ }
+
+ /**
+ * Gets the end time.
+ *
+ * @return the end time, or {@code null} if {@link #canEnd()} has not been invoked
+ * yet.
+ */
+ public Instant getEndTime() {
+ return endTime.get();
+ }
+
+ /**
+ * Prevents further callbacks from being executed by setting {@link #startTime}
+ * and {@link #endTime}.
+ */
+ @Override
+ public void run() {
+ canStart();
+ canEnd();
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcome.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcome.java
new file mode 100644
index 000000000..6b0924807
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/OperationOutcome.java
@@ -0,0 +1,116 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider;
+
+import java.time.Instant;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import org.onap.policy.controlloop.ControlLoopOperation;
+import org.onap.policy.controlloop.policy.PolicyResult;
+
+/**
+ * Outcome from an operation. Objects of this type are passed from one stage to the next.
+ */
+@Data
+@NoArgsConstructor
+public class OperationOutcome {
+ private String actor;
+ private String operation;
+ private String target;
+ private Instant start;
+ private Instant end;
+ private String subRequestId;
+ private PolicyResult result = PolicyResult.SUCCESS;
+ private String message;
+
+ /**
+ * Copy constructor.
+ *
+ * @param source source object from which to copy
+ */
+ public OperationOutcome(OperationOutcome source) {
+ this.actor = source.actor;
+ this.operation = source.operation;
+ this.target = source.target;
+ this.start = source.start;
+ this.end = source.end;
+ this.subRequestId = source.subRequestId;
+ this.result = source.result;
+ this.message = source.message;
+ }
+
+ /**
+ * Creates a {@link ControlLoopOperation}, populating all fields with the values from
+ * this object. Sets the outcome field to the string representation of this object's
+ * outcome.
+ *
+ * @return
+ */
+ public ControlLoopOperation toControlLoopOperation() {
+ ControlLoopOperation clo = new ControlLoopOperation();
+
+ clo.setActor(actor);
+ clo.setOperation(operation);
+ clo.setTarget(target);
+ clo.setStart(start);
+ clo.setEnd(end);
+ clo.setSubRequestId(subRequestId);
+ clo.setOutcome(result.toString());
+ clo.setMessage(message);
+
+ return clo;
+ }
+
+ /**
+ * Determines if this outcome is for the given actor and operation.
+ *
+ * @param actor actor name
+ * @param operation operation name
+ * @return {@code true} if this outcome is for the given actor and operation
+ */
+ public boolean isFor(@NonNull String actor, @NonNull String operation) {
+ // do the operation check first, as it's most likely to be unique
+ return (operation.equals(this.operation) && actor.equals(this.actor));
+ }
+
+ /**
+ * Determines if an outcome is for the given actor and operation.
+ *
+ * @param outcome outcome to be examined, or {@code null}
+ * @param actor actor name
+ * @param operation operation name
+ * @return {@code true} if this outcome is for the given actor and operation,
+ * {@code false} it is {@code null} or not for the actor/operation
+ */
+ public static boolean isFor(OperationOutcome outcome, String actor, String operation) {
+ return (outcome != null && outcome.isFor(actor, operation));
+ }
+
+ /**
+ * Sets the result.
+ *
+ * @param result new result
+ */
+ public void setResult(@NonNull PolicyResult result) {
+ this.result = result;
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java
index e308ee42e..c09460e34 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java
@@ -24,7 +24,6 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.onap.policy.common.capabilities.Configurable;
import org.onap.policy.common.capabilities.Startable;
-import org.onap.policy.controlloop.ControlLoopOperation;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
/**
@@ -54,5 +53,5 @@ public interface Operator extends Startable, Configurable<Map<String, Object>> {
* @param params parameters needed to start the operation
* @return a future that can be used to cancel or await the result of the operation
*/
- CompletableFuture<ControlLoopOperation> startOperation(ControlLoopOperationParams params);
+ CompletableFuture<OperationOutcome> startOperation(ControlLoopOperationParams params);
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java
index 0aba1a7fa..c3ddd17f3 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java
@@ -23,6 +23,9 @@ package org.onap.policy.controlloop.actorserviceprovider;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
import org.onap.policy.common.utils.coder.Coder;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.common.utils.coder.StandardCoder;
@@ -53,6 +56,82 @@ public class Util {
}
/**
+ * Logs a REST request. If the request is not of type, String, then it attempts to
+ * pretty-print it into JSON before logging.
+ *
+ * @param url request URL
+ * @param request request to be logged
+ */
+ public static <T> void logRestRequest(String url, T request) {
+ logRestRequest(new StandardCoder(), url, request);
+ }
+
+ /**
+ * Logs a REST request. If the request is not of type, String, then it attempts to
+ * pretty-print it into JSON before logging.
+ *
+ * @param coder coder to be used to pretty-print the request
+ * @param url request URL
+ * @param request request to be logged
+ */
+ protected static <T> void logRestRequest(Coder coder, String url, T request) {
+ String json;
+ try {
+ if (request instanceof String) {
+ json = request.toString();
+ } else {
+ json = coder.encode(request, true);
+ }
+
+ } catch (CoderException e) {
+ logger.warn("cannot pretty-print request", e);
+ json = request.toString();
+ }
+
+ NetLoggerUtil.log(EventType.OUT, CommInfrastructure.REST, url, json);
+ logger.info("[OUT|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json);
+ }
+
+ /**
+ * Logs a REST response. If the response is not of type, String, then it attempts to
+ * pretty-print it into JSON before logging.
+ *
+ * @param url request URL
+ * @param response response to be logged
+ */
+ public static <T> void logRestResponse(String url, T response) {
+ logRestResponse(new StandardCoder(), url, response);
+ }
+
+ /**
+ * Logs a REST response. If the request is not of type, String, then it attempts to
+ * pretty-print it into JSON before logging.
+ *
+ * @param coder coder to be used to pretty-print the response
+ * @param url request URL
+ * @param response response to be logged
+ */
+ protected static <T> void logRestResponse(Coder coder, String url, T response) {
+ String json;
+ try {
+ if (response == null) {
+ json = null;
+ } else if (response instanceof String) {
+ json = response.toString();
+ } else {
+ json = coder.encode(response, true);
+ }
+
+ } catch (CoderException e) {
+ logger.warn("cannot pretty-print response", e);
+ json = response.toString();
+ }
+
+ NetLoggerUtil.log(EventType.IN, CommInfrastructure.REST, url, json);
+ logger.info("[IN|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json);
+ }
+
+ /**
* Runs a function and logs a message if it throws an exception. Does <i>not</i>
* re-throw the exception.
*
@@ -60,7 +139,7 @@ public class Util {
* @param exceptionMessage message to log if an exception is thrown
* @param exceptionArgs arguments to be passed to the logger
*/
- public static void logException(Runnable function, String exceptionMessage, Object... exceptionArgs) {
+ public static void runFunction(Runnable function, String exceptionMessage, Object... exceptionArgs) {
try {
function.run();
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java
index 68bbe7edc..cd4d2570f 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java
@@ -22,11 +22,12 @@ package org.onap.policy.controlloop.actorserviceprovider.controlloop;
import java.io.Serializable;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import lombok.AccessLevel;
import lombok.Getter;
+import lombok.NonNull;
import lombok.Setter;
-import org.onap.policy.aai.AaiCqResponse;
import org.onap.policy.controlloop.VirtualControlLoopEvent;
/**
@@ -37,23 +38,35 @@ import org.onap.policy.controlloop.VirtualControlLoopEvent;
public class ControlLoopEventContext implements Serializable {
private static final long serialVersionUID = 1L;
- @Setter(AccessLevel.NONE)
+
private final VirtualControlLoopEvent event;
- private AaiCqResponse aaiCqResponse;
+ /**
+ * Enrichment data extracted from the event. Never {@code null}, though it may be
+ * immutable.
+ */
+ private final Map<String, String> enrichment;
- // TODO may remove this if it proves not to be needed
@Getter(AccessLevel.NONE)
@Setter(AccessLevel.NONE)
private Map<String, Serializable> properties = new ConcurrentHashMap<>();
/**
+ * Request ID extracted from the event, or a generated value if the event has no
+ * request id; never {@code null}.
+ */
+ private final UUID requestId;
+
+
+ /**
* Constructs the object.
*
* @param event event with which this is associated
*/
- public ControlLoopEventContext(VirtualControlLoopEvent event) {
+ public ControlLoopEventContext(@NonNull VirtualControlLoopEvent event) {
this.event = event;
+ this.requestId = (event.getRequestId() != null ? event.getRequestId() : UUID.randomUUID());
+ this.enrichment = (event.getAai() != null ? event.getAai() : Map.of());
}
/**
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java
index 9b9aa914e..d7f322e8a 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java
@@ -20,14 +20,12 @@
package org.onap.policy.controlloop.actorserviceprovider.impl;
-import com.google.common.collect.ImmutableMap;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.onap.policy.common.parameters.BeanValidationResult;
import org.onap.policy.controlloop.actorserviceprovider.Operator;
@@ -46,44 +44,42 @@ public class ActorImpl extends StartConfigPartial<Map<String, Object>> implement
/**
* Maps a name to an operator.
*/
- private Map<String, Operator> name2operator;
+ private final Map<String, Operator> name2operator = new ConcurrentHashMap<>();
/**
* Constructs the object.
*
* @param name actor name
- * @param operators the operations supported by this actor
*/
- public ActorImpl(String name, Operator... operators) {
+ public ActorImpl(String name) {
super(name);
- setOperators(Arrays.asList(operators));
}
/**
- * Sets the operators supported by this actor, overriding any previous list.
+ * Adds an operator supported by this actor.
*
- * @param operators the operations supported by this actor
+ * @param operator operation to be added
*/
- protected void setOperators(List<Operator> operators) {
+ protected synchronized void addOperator(Operator operator) {
+ /*
+ * This method is "synchronized" to prevent the state from changing while the
+ * operator is added. The map, itself, does not need synchronization as it's a
+ * concurrent map.
+ */
+
if (isConfigured()) {
throw new IllegalStateException("attempt to set operators on a configured actor: " + getName());
}
- Map<String, Operator> map = new HashMap<>();
- for (Operator newOp : operators) {
- map.compute(newOp.getName(), (opName, existingOp) -> {
- if (existingOp == null) {
- return newOp;
- }
-
- // TODO: should this throw an exception?
- logger.warn("duplicate names for actor operation {}.{}: {}, ignoring {}", getName(), opName,
- existingOp.getClass().getSimpleName(), newOp.getClass().getSimpleName());
- return existingOp;
- });
- }
+ name2operator.compute(operator.getName(), (opName, existingOp) -> {
+ if (existingOp == null) {
+ return operator;
+ }
- this.name2operator = ImmutableMap.copyOf(map);
+ logger.warn("duplicate names for actor operation {}.{}: {}, ignoring {}", getName(), opName,
+ existingOp.getClass().getSimpleName(), operator.getClass().getSimpleName());
+ return existingOp;
+ });
}
@Override
@@ -177,7 +173,7 @@ public class ActorImpl extends StartConfigPartial<Map<String, Object>> implement
for (Operator oper : name2operator.values()) {
if (oper.isConfigured()) {
- Util.logException(oper::start, "failed to start operation {}.{}", actorName, oper.getName());
+ Util.runFunction(oper::start, "failed to start operation {}.{}", actorName, oper.getName());
} else {
logger.warn("not starting unconfigured operation {}.{}", actorName, oper.getName());
@@ -195,7 +191,7 @@ public class ActorImpl extends StartConfigPartial<Map<String, Object>> implement
// @formatter:off
name2operator.values().forEach(
- oper -> Util.logException(oper::stop, "failed to stop operation {}.{}", actorName, oper.getName()));
+ oper -> Util.runFunction(oper::stop, "failed to stop operation {}.{}", actorName, oper.getName()));
// @formatter:on
}
@@ -208,7 +204,7 @@ public class ActorImpl extends StartConfigPartial<Map<String, Object>> implement
logger.info("shutting down operations for actor {}", actorName);
// @formatter:off
- name2operator.values().forEach(oper -> Util.logException(oper::shutdown,
+ name2operator.values().forEach(oper -> Util.runFunction(oper::shutdown,
"failed to shutdown operation {}.{}", actorName, oper.getName()));
// @formatter:on
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActor.java
new file mode 100644
index 000000000..28b7b3924
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActor.java
@@ -0,0 +1,58 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import java.util.Map;
+import java.util.function.Function;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpActorParams;
+
+/**
+ * Actor that uses HTTP, where the only additional property that an operator needs is a
+ * URL. The actor's parameters must be an {@link HttpActorParams} and its operator
+ * parameters are expected to be an {@link HttpParams}.
+ */
+public class HttpActor extends ActorImpl {
+
+ /**
+ * Constructs the object.
+ *
+ * @param name actor's name
+ */
+ public HttpActor(String name) {
+ super(name);
+ }
+
+ /**
+ * Translates the parameters to an {@link HttpActorParams} and then creates a function
+ * that will extract operator-specific parameters.
+ */
+ @Override
+ protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) {
+ String actorName = getName();
+
+ // @formatter:off
+ return Util.translate(actorName, actorParameters, HttpActorParams.class)
+ .doValidation(actorName)
+ .makeOperationParameters(actorName);
+ // @formatter:on
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java
new file mode 100644
index 000000000..566492907
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java
@@ -0,0 +1,84 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import java.util.Map;
+import lombok.AccessLevel;
+import lombok.Getter;
+import org.onap.policy.common.endpoints.http.client.HttpClient;
+import org.onap.policy.common.endpoints.http.client.HttpClientFactory;
+import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
+
+/**
+ * Operator that uses HTTP. The operator's parameters must be a {@link HttpParams}.
+ */
+public class HttpOperator extends OperatorPartial {
+
+ @Getter(AccessLevel.PROTECTED)
+ private HttpClient client;
+
+ @Getter
+ private long timeoutSec;
+
+ /**
+ * URI path for this particular operation.
+ */
+ @Getter
+ private String path;
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param actorName name of the actor with which this operator is associated
+ * @param name operation name
+ */
+ public HttpOperator(String actorName, String name) {
+ super(actorName, name);
+ }
+
+ /**
+ * Translates the parameters to an {@link HttpParams} and then extracts the relevant
+ * values.
+ */
+ @Override
+ protected void doConfigure(Map<String, Object> parameters) {
+ HttpParams params = Util.translate(getFullName(), parameters, HttpParams.class);
+ ValidationResult result = params.validate(getFullName());
+ if (!result.isValid()) {
+ throw new ParameterValidationRuntimeException("invalid parameters", result);
+ }
+
+ client = getClientFactory().get(params.getClientName());
+ path = params.getPath();
+ timeoutSec = params.getTimeoutSec();
+ }
+
+ // these may be overridden by junits
+
+ protected HttpClientFactory getClientFactory() {
+ return HttpClientFactoryInstance.getClientFactory();
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java
index 80d8fbd04..df5258d71 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java
@@ -20,37 +20,61 @@
package org.onap.policy.controlloop.actorserviceprovider.impl;
-import java.time.Instant;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
import java.util.function.Function;
+import lombok.AccessLevel;
import lombok.Getter;
+import lombok.Setter;
import org.onap.policy.controlloop.ControlLoopOperation;
+import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
import org.onap.policy.controlloop.actorserviceprovider.Operator;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
-import org.onap.policy.controlloop.policy.Policy;
import org.onap.policy.controlloop.policy.PolicyResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Partial implementation of an operator. Subclasses can choose to simply implement
- * {@link #doOperation(ControlLoopOperationParams)}, or they may choose to override
- * {@link #doOperationAsFuture(ControlLoopOperationParams)}.
+ * Partial implementation of an operator. In general, it's preferable that subclasses
+ * would override
+ * {@link #startOperationAsync(ControlLoopOperationParams, int, OperationOutcome)
+ * startOperationAsync()}. However, if that proves to be too difficult, then they can
+ * simply override {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome)
+ * doOperation()}. In addition, if the operation requires any preprocessor steps, the
+ * subclass may choose to override
+ * {@link #startPreprocessorAsync(ControlLoopOperationParams) startPreprocessorAsync()}.
+ * <p/>
+ * The futures returned by the methods within this class can be canceled, and will
+ * propagate the cancellation to any subtasks. Thus it is also expected that any futures
+ * returned by overridden methods will do the same. Of course, if a class overrides
+ * {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome) doOperation()},
+ * then there's little that can be done to cancel that particular operation.
*/
public abstract class OperatorPartial extends StartConfigPartial<Map<String, Object>> implements Operator {
private static final Logger logger = LoggerFactory.getLogger(OperatorPartial.class);
- private static final String OUTCOME_SUCCESS = PolicyResult.SUCCESS.toString();
- private static final String OUTCOME_FAILURE = PolicyResult.FAILURE.toString();
- private static final String OUTCOME_RETRIES = PolicyResult.FAILURE_RETRIES.toString();
+ /**
+ * Executor to be used for tasks that may perform blocking I/O. The default executor
+ * simply launches a new thread for each command that is submitted to it.
+ * <p/>
+ * May be overridden by junit tests.
+ */
+ @Getter(AccessLevel.PROTECTED)
+ @Setter(AccessLevel.PROTECTED)
+ private Executor blockingExecutor = command -> {
+ Thread thread = new Thread(command);
+ thread.setDaemon(true);
+ thread.start();
+ };
@Getter
private final String actorName;
@@ -103,94 +127,52 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
}
@Override
- public final CompletableFuture<ControlLoopOperation> startOperation(ControlLoopOperationParams params) {
+ public final CompletableFuture<OperationOutcome> startOperation(ControlLoopOperationParams params) {
if (!isAlive()) {
throw new IllegalStateException("operation is not running: " + getFullName());
}
- final Executor executor = params.getExecutor();
-
// allocate a controller for the entire operation
- final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
+ final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
- CompletableFuture<ControlLoopOperation> preproc = startPreprocessor(params);
+ CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync(params);
if (preproc == null) {
// no preprocessor required - just start the operation
return startOperationAttempt(params, controller, 1);
}
- // propagate "stop" to the preprocessor
- controller.add(preproc);
-
/*
* Do preprocessor first and then, if successful, start the operation. Note:
* operations create their own outcome, ignoring the outcome from any previous
* steps.
+ *
+ * Wrap the preprocessor to ensure "stop" is propagated to it.
*/
- preproc.whenCompleteAsync(controller.delayedRemove(preproc), executor)
- .thenComposeAsync(handleFailure(params, controller), executor)
- .thenComposeAsync(onSuccess(params, unused -> startOperationAttempt(params, controller, 1)),
- executor);
-
- return controller;
- }
-
- /**
- * Starts an operation's preprocessor step(s). If the preprocessor fails, then it
- * invokes the started and completed call-backs.
- *
- * @param params operation parameters
- * @return a future that will return the preprocessor outcome, or {@code null} if this
- * operation needs no preprocessor
- */
- protected CompletableFuture<ControlLoopOperation> startPreprocessor(ControlLoopOperationParams params) {
- logger.info("{}: start low-level operation preprocessor for {}", getFullName(), params.getRequestId());
-
- final Executor executor = params.getExecutor();
- final ControlLoopOperation operation = params.makeOutcome();
-
- final Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preproc =
- doPreprocessorAsFuture(params);
- if (preproc == null) {
- // no preprocessor required
- return null;
- }
-
- // allocate a controller for the preprocessor steps
- final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
-
- /*
- * Don't mark it complete until we've built the whole pipeline. This will prevent
- * the operation from starting until after it has been successfully built (i.e.,
- * without generating any exceptions).
- */
- final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>();
-
// @formatter:off
- firstFuture
- .thenComposeAsync(controller.add(preproc), executor)
- .exceptionally(fromException(params, operation))
- .whenCompleteAsync(controller.delayedComplete(), executor);
+ controller.wrap(preproc)
+ .exceptionally(fromException(params, "preprocessor of operation"))
+ .thenCompose(handlePreprocessorFailure(params, controller))
+ .thenCompose(unusedOutcome -> startOperationAttempt(params, controller, 1));
// @formatter:on
- // start the pipeline
- firstFuture.complete(operation);
-
return controller;
}
/**
* Handles a failure in the preprocessor pipeline. If a failure occurred, then it
- * invokes the call-backs and returns a failed outcome. Otherwise, it returns the
- * outcome that it received.
+ * invokes the call-backs, marks the controller complete, and returns an incomplete
+ * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
+ * received.
+ * <p/>
+ * Assumes that no callbacks have been invoked yet.
*
* @param params operation parameters
* @param controller pipeline controller
* @return a function that checks the outcome status and continues, if successful, or
* indicates a failure otherwise
*/
- private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> handleFailure(
- ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller) {
+ private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
+ ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller) {
return outcome -> {
@@ -207,19 +189,21 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
// propagate "stop" to the callbacks
controller.add(callbacks);
- final ControlLoopOperation outcome2 = params.makeOutcome();
+ final OperationOutcome outcome2 = params.makeOutcome();
// TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
- outcome2.setOutcome(PolicyResult.FAILURE_GUARD.toString());
+ outcome2.setResult(PolicyResult.FAILURE_GUARD);
outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
- CompletableFuture.completedFuture(outcome2).thenApplyAsync(callbackStarted(params, callbacks), executor)
- .thenApplyAsync(callbackCompleted(params, callbacks), executor)
- .whenCompleteAsync(controller.delayedRemove(callbacks), executor)
+ // @formatter:off
+ CompletableFuture.completedFuture(outcome2)
+ .whenCompleteAsync(callbackStarted(params, callbacks), executor)
+ .whenCompleteAsync(callbackCompleted(params, callbacks), executor)
.whenCompleteAsync(controller.delayedComplete(), executor);
+ // @formatter:on
- return controller;
+ return new CompletableFuture<>();
};
}
@@ -237,8 +221,7 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @return a function that will start the preprocessor and returns its outcome, or
* {@code null} if this operation needs no preprocessor
*/
- protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture(
- ControlLoopOperationParams params) {
+ protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) {
return null;
}
@@ -251,20 +234,12 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @param attempt attempt number, typically starting with 1
* @return a future that will return the final result of all attempts
*/
- private CompletableFuture<ControlLoopOperation> startOperationAttempt(ControlLoopOperationParams params,
- PipelineControllerFuture<ControlLoopOperation> controller, int attempt) {
-
- final Executor executor = params.getExecutor();
-
- CompletableFuture<ControlLoopOperation> future = startAttemptWithoutRetries(params, attempt);
+ private CompletableFuture<OperationOutcome> startOperationAttempt(ControlLoopOperationParams params,
+ PipelineControllerFuture<OperationOutcome> controller, int attempt) {
// propagate "stop" to the operation attempt
- controller.add(future);
-
- // detach when complete
- future.whenCompleteAsync(controller.delayedRemove(future), executor)
- .thenComposeAsync(retryOnFailure(params, controller, attempt), params.getExecutor())
- .whenCompleteAsync(controller.delayedComplete(), executor);
+ controller.wrap(startAttemptWithoutRetries(params, attempt))
+ .thenCompose(retryOnFailure(params, controller, attempt));
return controller;
}
@@ -276,40 +251,32 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @param attempt attempt number, typically starting with 1
* @return a future that will return the result of a single operation attempt
*/
- private CompletableFuture<ControlLoopOperation> startAttemptWithoutRetries(ControlLoopOperationParams params,
+ private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(ControlLoopOperationParams params,
int attempt) {
logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
final Executor executor = params.getExecutor();
- final ControlLoopOperation outcome = params.makeOutcome();
+ final OperationOutcome outcome = params.makeOutcome();
final CallbackManager callbacks = new CallbackManager();
// this operation attempt gets its own controller
- final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
+ final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
// propagate "stop" to the callbacks
controller.add(callbacks);
- /*
- * Don't mark it complete until we've built the whole pipeline. This will prevent
- * the operation from starting until after it has been successfully built (i.e.,
- * without generating any exceptions).
- */
- final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>();
-
// @formatter:off
- CompletableFuture<ControlLoopOperation> future2 =
- firstFuture.thenComposeAsync(verifyRunning(controller, params), executor)
- .thenApplyAsync(callbackStarted(params, callbacks), executor)
- .thenComposeAsync(controller.add(doOperationAsFuture(params, attempt)), executor);
+ CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
+ .whenCompleteAsync(callbackStarted(params, callbacks), executor)
+ .thenCompose(controller.wrap(outcome2 -> startOperationAsync(params, attempt, outcome2)));
// @formatter:on
// handle timeouts, if specified
- long timeoutMillis = getTimeOutMillis(params.getPolicy());
+ long timeoutMillis = getTimeOutMillis(params.getTimeoutSec());
if (timeoutMillis > 0) {
logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
- future2 = future2.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
+ future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
}
/*
@@ -321,16 +288,13 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
*/
// @formatter:off
- future2.exceptionally(fromException(params, outcome))
- .thenApplyAsync(setRetryFlag(params, attempt), executor)
- .thenApplyAsync(callbackStarted(params, callbacks), executor)
- .thenApplyAsync(callbackCompleted(params, callbacks), executor)
+ future.exceptionally(fromException(params, "operation"))
+ .thenApply(setRetryFlag(params, attempt))
+ .whenCompleteAsync(callbackStarted(params, callbacks), executor)
+ .whenCompleteAsync(callbackCompleted(params, callbacks), executor)
.whenCompleteAsync(controller.delayedComplete(), executor);
// @formatter:on
- // start the pipeline
- firstFuture.complete(outcome);
-
return controller;
}
@@ -340,8 +304,8 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @param outcome outcome to examine
* @return {@code true} if the outcome was successful
*/
- protected boolean isSuccess(ControlLoopOperation outcome) {
- return OUTCOME_SUCCESS.equals(outcome.getOutcome());
+ protected boolean isSuccess(OperationOutcome outcome) {
+ return (outcome.getResult() == PolicyResult.SUCCESS);
}
/**
@@ -351,13 +315,29 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @return {@code true} if the outcome is not {@code null} and was a failure
* <i>and</i> was associated with this operator, {@code false} otherwise
*/
- protected boolean isActorFailed(ControlLoopOperation outcome) {
- return OUTCOME_FAILURE.equals(getActorOutcome(outcome));
+ protected boolean isActorFailed(OperationOutcome outcome) {
+ return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
+ }
+
+ /**
+ * Determines if the given outcome is for this operation.
+ *
+ * @param outcome outcome to examine
+ * @return {@code true} if the outcome is for this operation, {@code false} otherwise
+ */
+ protected boolean isSameOperation(OperationOutcome outcome) {
+ return OperationOutcome.isFor(outcome, getActorName(), getName());
}
/**
* Invokes the operation as a "future". This method simply invokes
- * {@link #doOperation(ControlLoopOperationParams)} turning it into a "future".
+ * {@link #doOperation(ControlLoopOperationParams)} using the {@link #blockingExecutor
+ * "blocking executor"}, returning the result via a "future".
+ * <p/>
+ * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
+ * the executor in the "params", as that may bring the background thread pool to a
+ * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
+ * instead.
* <p/>
* This method assumes the following:
* <ul>
@@ -373,31 +353,23 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @return a function that will start the operation and return its result when
* complete
*/
- protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture(
- ControlLoopOperationParams params, int attempt) {
-
- /*
- * TODO As doOperation() may perform blocking I/O, this should be launched in its
- * own thread to prevent the ForkJoinPool from being tied up. Should probably
- * provide a method to make that easy.
- */
+ protected CompletableFuture<OperationOutcome> startOperationAsync(ControlLoopOperationParams params, int attempt,
+ OperationOutcome outcome) {
- return operation -> CompletableFuture.supplyAsync(() -> doOperation(params, attempt, operation),
- params.getExecutor());
+ return CompletableFuture.supplyAsync(() -> doOperation(params, attempt, outcome), getBlockingExecutor());
}
/**
* Low-level method that performs the operation. This can make the same assumptions
* that are made by {@link #doOperationAsFuture(ControlLoopOperationParams)}. This
- * method throws an {@link UnsupportedOperationException}.
+ * particular method simply throws an {@link UnsupportedOperationException}.
*
* @param params operation parameters
* @param attempt attempt number, typically starting with 1
* @param operation the operation being performed
* @return the outcome of the operation
*/
- protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt,
- ControlLoopOperation operation) {
+ protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt, OperationOutcome operation) {
throw new UnsupportedOperationException("start operation " + getFullName());
}
@@ -411,8 +383,7 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @param attempt latest attempt number, starting with 1
* @return a function to get the next future to execute
*/
- private Function<ControlLoopOperation, ControlLoopOperation> setRetryFlag(ControlLoopOperationParams params,
- int attempt) {
+ private Function<OperationOutcome, OperationOutcome> setRetryFlag(ControlLoopOperationParams params, int attempt) {
return operation -> {
if (operation != null && !isActorFailed(operation)) {
@@ -424,22 +395,22 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
}
// get a non-null operation
- ControlLoopOperation oper2;
+ OperationOutcome oper2;
if (operation != null) {
oper2 = operation;
} else {
oper2 = params.makeOutcome();
- oper2.setOutcome(OUTCOME_FAILURE);
+ oper2.setResult(PolicyResult.FAILURE);
}
- if (params.getPolicy().getRetry() != null && params.getPolicy().getRetry() > 0
- && attempt > params.getPolicy().getRetry()) {
+ Integer retry = params.getRetry();
+ if (retry != null && retry > 0 && attempt > retry) {
/*
* retries were specified and we've already tried them all - change to
* FAILURE_RETRIES
*/
logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
- oper2.setOutcome(OUTCOME_RETRIES);
+ oper2.setResult(PolicyResult.FAILURE_RETRIES);
}
return oper2;
@@ -456,21 +427,24 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @param attempt latest attempt number, starting with 1
* @return a function to get the next future to execute
*/
- private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> retryOnFailure(
- ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller,
+ private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
+ ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller,
int attempt) {
return operation -> {
if (!isActorFailed(operation)) {
// wrong type or wrong operation - just leave it as is
logger.trace("not retrying operation {} for {}", getFullName(), params.getRequestId());
- return CompletableFuture.completedFuture(operation);
+ controller.complete(operation);
+ return new CompletableFuture<>();
}
- if (params.getPolicy().getRetry() == null || params.getPolicy().getRetry() <= 0) {
+ Integer retry = params.getRetry();
+ if (retry == null || retry <= 0) {
// no retries - already marked as FAILURE, so just return it
logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
- return CompletableFuture.completedFuture(operation);
+ controller.complete(operation);
+ return new CompletableFuture<>();
}
@@ -484,100 +458,279 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
}
/**
- * Gets the outcome of an operation for this operation.
+ * Converts an exception into an operation outcome, returning a copy of the outcome to
+ * prevent background jobs from changing it.
*
- * @param operation operation whose outcome is to be extracted
- * @return the outcome of the given operation, if it's for this operator, {@code null}
- * otherwise
+ * @param params operation parameters
+ * @param type type of item throwing the exception
+ * @return a function that will convert an exception into an operation outcome
*/
- protected String getActorOutcome(ControlLoopOperation operation) {
- if (operation == null) {
- return null;
- }
+ private Function<Throwable, OperationOutcome> fromException(ControlLoopOperationParams params, String type) {
- if (!getActorName().equals(operation.getActor())) {
- return null;
- }
+ return thrown -> {
+ OperationOutcome outcome = params.makeOutcome();
+
+ logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
+ params.getRequestId(), thrown);
+
+ return setOutcome(params, outcome, thrown);
+ };
+ }
+
+ /**
+ * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
+ * any outstanding futures when one completes.
+ *
+ * @param params operation parameters
+ * @param futures futures for which to wait
+ * @return a future to cancel or await an outcome. If this future is canceled, then
+ * all of the futures will be canceled
+ */
+ protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params,
+ List<CompletableFuture<OperationOutcome>> futures) {
+
+ // convert list to an array
+ @SuppressWarnings("rawtypes")
+ CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
+
+ @SuppressWarnings("unchecked")
+ CompletableFuture<OperationOutcome> result = anyOf(params, arrFutures);
+ return result;
+ }
+
+ /**
+ * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any
+ * outstanding futures when one completes.
+ *
+ * @param params operation parameters
+ * @param futures futures for which to wait
+ * @return a future to cancel or await an outcome. If this future is canceled, then
+ * all of the futures will be canceled
+ */
+ protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params,
+ @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+
+ final Executor executor = params.getExecutor();
+ final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+
+ attachFutures(controller, futures);
+
+ // @formatter:off
+ CompletableFuture.anyOf(futures)
+ .thenApply(object -> (OperationOutcome) object)
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+ // @formatter:on
+
+ return controller;
+ }
+
+ /**
+ * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels
+ * the futures if returned future is canceled. The future returns the "worst" outcome,
+ * based on priority (see {@link #detmPriority(OperationOutcome)}).
+ *
+ * @param params operation parameters
+ * @param futures futures for which to wait
+ * @return a future to cancel or await an outcome. If this future is canceled, then
+ * all of the futures will be canceled
+ */
+ protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params,
+ List<CompletableFuture<OperationOutcome>> futures) {
- if (!getName().equals(operation.getOperation())) {
- return null;
+ // convert list to an array
+ @SuppressWarnings("rawtypes")
+ CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
+
+ @SuppressWarnings("unchecked")
+ CompletableFuture<OperationOutcome> result = allOf(params, arrFutures);
+ return result;
+ }
+
+ /**
+ * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the
+ * futures if returned future is canceled. The future returns the "worst" outcome,
+ * based on priority (see {@link #detmPriority(OperationOutcome)}).
+ *
+ * @param params operation parameters
+ * @param futures futures for which to wait
+ * @return a future to cancel or await an outcome. If this future is canceled, then
+ * all of the futures will be canceled
+ */
+ protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params,
+ @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+
+ final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+
+ attachFutures(controller, futures);
+
+ OperationOutcome[] outcomes = new OperationOutcome[futures.length];
+
+ @SuppressWarnings("rawtypes")
+ CompletableFuture[] futures2 = new CompletableFuture[futures.length];
+
+ // record the outcomes of each future when it completes
+ for (int count = 0; count < futures2.length; ++count) {
+ final int count2 = count;
+ futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2);
}
- return operation.getOutcome();
+ CompletableFuture.allOf(futures2).whenComplete(combineOutcomes(params, controller, outcomes));
+
+ return controller;
}
/**
- * Gets a function that will start the next step, if the current operation was
- * successful, or just return the current operation, otherwise.
+ * Attaches the given futures to the controller.
+ *
+ * @param controller master controller for all of the futures
+ * @param futures futures to be attached to the controller
+ */
+ private void attachFutures(PipelineControllerFuture<OperationOutcome> controller,
+ @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+
+ // attach each task
+ for (CompletableFuture<OperationOutcome> future : futures) {
+ controller.add(future);
+ }
+ }
+
+ /**
+ * Combines the outcomes from a set of tasks.
*
* @param params operation parameters
- * @param nextStep function that will invoke the next step, passing it the operation
- * @return a function that will start the next step
+ * @param future future to be completed with the combined result
+ * @param outcomes outcomes to be examined
*/
- protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> onSuccess(
- ControlLoopOperationParams params,
- Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> nextStep) {
+ private BiConsumer<Void, Throwable> combineOutcomes(ControlLoopOperationParams params,
+ CompletableFuture<OperationOutcome> future, OperationOutcome[] outcomes) {
- return operation -> {
+ return (unused, thrown) -> {
+ if (thrown != null) {
+ future.completeExceptionally(thrown);
+ return;
+ }
- if (operation == null) {
- logger.trace("{}: null outcome - discarding next task for {}", getFullName(), params.getRequestId());
- ControlLoopOperation outcome = params.makeOutcome();
- outcome.setOutcome(OUTCOME_FAILURE);
- return CompletableFuture.completedFuture(outcome);
+ // identify the outcome with the highest priority
+ OperationOutcome outcome = outcomes[0];
+ int priority = detmPriority(outcome);
- } else if (isSuccess(operation)) {
- logger.trace("{}: success - starting next task for {}", getFullName(), params.getRequestId());
- return nextStep.apply(operation);
+ // start with "1", as we've already dealt with "0"
+ for (int count = 1; count < outcomes.length; ++count) {
+ OperationOutcome outcome2 = outcomes[count];
+ int priority2 = detmPriority(outcome2);
- } else {
- logger.trace("{}: failure - discarding next task for {}", getFullName(), params.getRequestId());
- return CompletableFuture.completedFuture(operation);
+ if (priority2 > priority) {
+ outcome = outcome2;
+ priority = priority2;
+ }
}
+
+ logger.trace("{}: combined outcome of tasks is {} for {}", getFullName(),
+ (outcome == null ? null : outcome.getResult()), params.getRequestId());
+
+ future.complete(outcome);
};
}
/**
- * Converts an exception into an operation outcome, returning a copy of the outcome to
- * prevent background jobs from changing it.
+ * Determines the priority of an outcome based on its result.
*
- * @param params operation parameters
- * @param operation current operation
- * @return a function that will convert an exception into an operation outcome
+ * @param outcome outcome to examine, or {@code null}
+ * @return the outcome's priority
*/
- private Function<Throwable, ControlLoopOperation> fromException(ControlLoopOperationParams params,
- ControlLoopOperation operation) {
+ protected int detmPriority(OperationOutcome outcome) {
+ if (outcome == null) {
+ return 1;
+ }
- return thrown -> {
- logger.warn("exception throw by operation {}.{} for {}", operation.getActor(), operation.getOperation(),
- params.getRequestId(), thrown);
+ switch (outcome.getResult()) {
+ case SUCCESS:
+ return 0;
+
+ case FAILURE_GUARD:
+ return 2;
+
+ case FAILURE_RETRIES:
+ return 3;
+
+ case FAILURE:
+ return 4;
+
+ case FAILURE_TIMEOUT:
+ return 5;
+
+ case FAILURE_EXCEPTION:
+ default:
+ return 6;
+ }
+ }
+
+ /**
+ * Performs a task, after verifying that the controller is still running. Also checks
+ * that the previous outcome was successful, if specified.
+ *
+ * @param params operation parameters
+ * @param controller overall pipeline controller
+ * @param checkSuccess {@code true} to check the previous outcome, {@code false}
+ * otherwise
+ * @param outcome outcome of the previous task
+ * @param tasks tasks to be performed
+ * @return a function to perform the task. If everything checks out, then it returns
+ * the task's future. Otherwise, it returns an incomplete future and completes
+ * the controller instead.
+ */
+ // @formatter:off
+ protected CompletableFuture<OperationOutcome> doTask(ControlLoopOperationParams params,
+ PipelineControllerFuture<OperationOutcome> controller,
+ boolean checkSuccess, OperationOutcome outcome,
+ CompletableFuture<OperationOutcome> task) {
+ // @formatter:on
+ if (checkSuccess && !isSuccess(outcome)) {
/*
- * Must make a copy of the operation, as the original could be changed by
- * background jobs that might still be running.
+ * must complete before canceling so that cancel() doesn't cause controller to
+ * complete
*/
- return setOutcome(params, new ControlLoopOperation(operation), thrown);
- };
+ controller.complete(outcome);
+ task.cancel(false);
+ return new CompletableFuture<>();
+ }
+
+ return controller.wrap(task);
}
/**
- * Gets a function to verify that the operation is still running. If the pipeline is
- * not running, then it returns an incomplete future, which will effectively halt
- * subsequent operations in the pipeline. This method is intended to be used with one
- * of the {@link CompletableFuture}'s <i>thenCompose()</i> methods.
+ * Performs a task, after verifying that the controller is still running. Also checks
+ * that the previous outcome was successful, if specified.
*
- * @param controller pipeline controller
* @param params operation parameters
- * @return a function to verify that the operation is still running
+ * @param controller overall pipeline controller
+ * @param checkSuccess {@code true} to check the previous outcome, {@code false}
+ * otherwise
+ * @param tasks tasks to be performed
+ * @return a function to perform the task. If everything checks out, then it returns
+ * the task's future. Otherwise, it returns an incomplete future and completes
+ * the controller instead.
*/
- protected <T> Function<T, CompletableFuture<T>> verifyRunning(
- PipelineControllerFuture<ControlLoopOperation> controller, ControlLoopOperationParams params) {
+ // @formatter:off
+ protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask(ControlLoopOperationParams params,
+ PipelineControllerFuture<OperationOutcome> controller,
+ boolean checkSuccess,
+ Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) {
+ // @formatter:on
+
+ return outcome -> {
+
+ if (!controller.isRunning()) {
+ return new CompletableFuture<>();
+ }
- return value -> {
- boolean running = controller.isRunning();
- logger.trace("{}: verify running {} for {}", getFullName(), running, params.getRequestId());
+ if (checkSuccess && !isSuccess(outcome)) {
+ controller.complete(outcome);
+ return new CompletableFuture<>();
+ }
- return (running ? CompletableFuture.completedFuture(value) : new CompletableFuture<>());
+ return controller.wrap(task.apply(outcome));
};
}
@@ -591,10 +744,10 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @param callbacks used to determine if the start callback can be invoked
* @return a function that sets the start time and invokes the callback
*/
- private Function<ControlLoopOperation, ControlLoopOperation> callbackStarted(ControlLoopOperationParams params,
+ private BiConsumer<OperationOutcome, Throwable> callbackStarted(ControlLoopOperationParams params,
CallbackManager callbacks) {
- return outcome -> {
+ return (outcome, thrown) -> {
if (callbacks.canStart()) {
// haven't invoked "start" callback yet
@@ -602,8 +755,6 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
outcome.setEnd(null);
params.callbackStarted(outcome);
}
-
- return outcome;
};
}
@@ -621,18 +772,16 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @param callbacks used to determine if the end callback can be invoked
* @return a function that sets the end time and invokes the callback
*/
- private Function<ControlLoopOperation, ControlLoopOperation> callbackCompleted(ControlLoopOperationParams params,
+ private BiConsumer<OperationOutcome, Throwable> callbackCompleted(ControlLoopOperationParams params,
CallbackManager callbacks) {
- return operation -> {
+ return (outcome, thrown) -> {
if (callbacks.canEnd()) {
- operation.setStart(callbacks.getStartTime());
- operation.setEnd(callbacks.getEndTime());
- params.callbackCompleted(operation);
+ outcome.setStart(callbacks.getStartTime());
+ outcome.setEnd(callbacks.getEndTime());
+ params.callbackCompleted(outcome);
}
-
- return operation;
};
}
@@ -643,7 +792,7 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @param operation operation to be updated
* @return the updated operation
*/
- protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation,
+ protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation,
Throwable thrown) {
PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
return setOutcome(params, operation, result);
@@ -657,10 +806,10 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* @param result result of the operation
* @return the updated operation
*/
- protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation,
+ protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation,
PolicyResult result) {
logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
- operation.setOutcome(result.toString());
+ operation.setResult(result);
operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
: ControlLoopOperation.FAILED_MSG);
@@ -687,71 +836,10 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
* Gets the operation timeout. Subclasses may override this method to obtain the
* timeout in some other way (e.g., through configuration properties).
*
- * @param policy policy from which to extract the timeout
+ * @param timeoutSec timeout, in seconds, or {@code null}
* @return the operation timeout, in milliseconds
*/
- protected long getTimeOutMillis(Policy policy) {
- Integer timeoutSec = policy.getTimeout();
+ protected long getTimeOutMillis(Integer timeoutSec) {
return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
}
-
- /**
- * Manager for "start" and "end" callbacks.
- */
- private static class CallbackManager implements Runnable {
- private final AtomicReference<Instant> startTime = new AtomicReference<>();
- private final AtomicReference<Instant> endTime = new AtomicReference<>();
-
- /**
- * Determines if the "start" callback can be invoked. If so, it sets the
- * {@link #startTime} to the current time.
- *
- * @return {@code true} if the "start" callback can be invoked, {@code false}
- * otherwise
- */
- public boolean canStart() {
- return startTime.compareAndSet(null, Instant.now());
- }
-
- /**
- * Determines if the "end" callback can be invoked. If so, it sets the
- * {@link #endTime} to the current time.
- *
- * @return {@code true} if the "end" callback can be invoked, {@code false}
- * otherwise
- */
- public boolean canEnd() {
- return endTime.compareAndSet(null, Instant.now());
- }
-
- /**
- * Gets the start time.
- *
- * @return the start time, or {@code null} if {@link #canStart()} has not been
- * invoked yet.
- */
- public Instant getStartTime() {
- return startTime.get();
- }
-
- /**
- * Gets the end time.
- *
- * @return the end time, or {@code null} if {@link #canEnd()} has not been invoked
- * yet.
- */
- public Instant getEndTime() {
- return endTime.get();
- }
-
- /**
- * Prevents further callbacks from being executed by setting {@link #startTime}
- * and {@link #endTime}.
- */
- @Override
- public void run() {
- canStart();
- canEnd();
- }
- }
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java
index 08aba81f2..57fce40d7 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java
@@ -20,6 +20,7 @@
package org.onap.policy.controlloop.actorserviceprovider.parameters;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -32,11 +33,11 @@ import lombok.Getter;
import org.onap.policy.common.parameters.BeanValidationResult;
import org.onap.policy.common.parameters.BeanValidator;
import org.onap.policy.common.parameters.annotations.NotNull;
-import org.onap.policy.controlloop.ControlLoopOperation;
import org.onap.policy.controlloop.actorserviceprovider.ActorService;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
import org.onap.policy.controlloop.actorserviceprovider.Util;
import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
-import org.onap.policy.controlloop.policy.Policy;
+import org.onap.policy.controlloop.policy.Target;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,36 +50,67 @@ import org.slf4j.LoggerFactory;
@AllArgsConstructor
@EqualsAndHashCode
public class ControlLoopOperationParams {
-
private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationParams.class);
- public static final String UNKNOWN = "-unknown-";
-
+ /**
+ * Actor name.
+ */
+ @NotNull
+ private String actor;
/**
- * The actor service in which to find the actor/operation.
+ * Actor service in which to find the actor/operation.
*/
@NotNull
private ActorService actorService;
/**
- * The event for which the operation applies.
+ * Event for which the operation applies.
*/
@NotNull
private ControlLoopEventContext context;
/**
- * The executor to use to run the operation.
+ * Executor to use to run the operation.
*/
@NotNull
@Builder.Default
private Executor executor = ForkJoinPool.commonPool();
/**
- * The policy associated with the operation.
+ * Operation name.
+ */
+ @NotNull
+ private String operation;
+
+ /**
+ * Payload data for the request.
+ */
+ private Map<String, String> payload;
+
+ /**
+ * Number of retries allowed, or {@code null} if no retries.
+ */
+ private Integer retry;
+
+ /**
+ * The entity's target information. May be {@code null}, depending on the requirement
+ * of the operation to be invoked.
+ */
+ private Target target;
+
+ /**
+ * Target entity.
*/
@NotNull
- private Policy policy;
+ private String targetEntity;
+
+ /**
+ * Timeout, in seconds, or {@code null} if no timeout. Zero and negative values also
+ * imply no timeout.
+ */
+ @Builder.Default
+ private Integer timeoutSec = 300;
/**
* The function to invoke when the operation starts. This is optional.
@@ -87,7 +119,7 @@ public class ControlLoopOperationParams {
* may happen if the current operation requires other operations to be performed first
* (e.g., A&AI queries, guard checks).
*/
- private Consumer<ControlLoopOperation> startCallback;
+ private Consumer<OperationOutcome> startCallback;
/**
* The function to invoke when the operation completes. This is optional.
@@ -96,13 +128,7 @@ public class ControlLoopOperationParams {
* may happen if the current operation requires other operations to be performed first
* (e.g., A&AI queries, guard checks).
*/
- private Consumer<ControlLoopOperation> completeCallback;
-
- /**
- * Target entity.
- */
- @NotNull
- private String target;
+ private Consumer<OperationOutcome> completeCallback;
/**
* Starts the specified operation.
@@ -110,7 +136,7 @@ public class ControlLoopOperationParams {
* @return a future that will return the result of the operation
* @throws IllegalArgumentException if the parameters are invalid
*/
- public CompletableFuture<ControlLoopOperation> start() {
+ public CompletableFuture<OperationOutcome> start() {
BeanValidationResult result = validate();
if (!result.isValid()) {
logger.warn("parameter error in operation {}.{} for {}:\n{}", getActor(), getOperation(), getRequestId(),
@@ -120,31 +146,13 @@ public class ControlLoopOperationParams {
// @formatter:off
return actorService
- .getActor(policy.getActor())
- .getOperator(policy.getRecipe())
+ .getActor(getActor())
+ .getOperator(getOperation())
.startOperation(this);
// @formatter:on
}
/**
- * Gets the name of the actor from the policy.
- *
- * @return the actor name, or {@link #UNKNOWN} if no name is available
- */
- public String getActor() {
- return (policy == null || policy.getActor() == null ? UNKNOWN : policy.getActor());
- }
-
- /**
- * Gets the name of the operation from the policy.
- *
- * @return the operation name, or {@link #UNKNOWN} if no name is available
- */
- public String getOperation() {
- return (policy == null || policy.getRecipe() == null ? UNKNOWN : policy.getRecipe());
- }
-
- /**
* Gets the requested ID of the associated event.
*
* @return the event's request ID, or {@code null} if no request ID is available
@@ -158,13 +166,13 @@ public class ControlLoopOperationParams {
*
* @return a new operation outcome
*/
- public ControlLoopOperation makeOutcome() {
- ControlLoopOperation operation = new ControlLoopOperation();
- operation.setActor(getActor());
- operation.setOperation(getOperation());
- operation.setTarget(target);
+ public OperationOutcome makeOutcome() {
+ OperationOutcome outcome = new OperationOutcome();
+ outcome.setActor(getActor());
+ outcome.setOperation(getOperation());
+ outcome.setTarget(targetEntity);
- return operation;
+ return outcome;
}
/**
@@ -173,11 +181,11 @@ public class ControlLoopOperationParams {
*
* @param operation the operation that is being started
*/
- public void callbackStarted(ControlLoopOperation operation) {
+ public void callbackStarted(OperationOutcome operation) {
logger.info("started operation {}.{} for {}", operation.getActor(), operation.getOperation(), getRequestId());
if (startCallback != null) {
- Util.logException(() -> startCallback.accept(operation), "{}.{}: start-callback threw an exception for {}",
+ Util.runFunction(() -> startCallback.accept(operation), "{}.{}: start-callback threw an exception for {}",
operation.getActor(), operation.getOperation(), getRequestId());
}
}
@@ -188,12 +196,12 @@ public class ControlLoopOperationParams {
*
* @param operation the operation that is being started
*/
- public void callbackCompleted(ControlLoopOperation operation) {
+ public void callbackCompleted(OperationOutcome operation) {
logger.info("completed operation {}.{} outcome={} for {}", operation.getActor(), operation.getOperation(),
- operation.getOutcome(), getRequestId());
+ operation.getResult(), getRequestId());
if (completeCallback != null) {
- Util.logException(() -> completeCallback.accept(operation),
+ Util.runFunction(() -> completeCallback.accept(operation),
"{}.{}: complete-callback threw an exception for {}", operation.getActor(),
operation.getOperation(), getRequestId());
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java
index d34a3fb5b..1d64a8710 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/ListenerManager.java
@@ -101,7 +101,7 @@ public class ListenerManager {
*/
protected void runListener(Runnable listener) {
// TODO do this asynchronously?
- Util.logException(listener, "pipeline listener {} threw an exception", listener);
+ Util.runFunction(listener, "pipeline listener {} threw an exception", listener);
}
/**
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java
index 96c8f9e05..92843e28a 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java
@@ -23,41 +23,70 @@ package org.onap.policy.controlloop.actorserviceprovider.pipeline;
import static org.onap.policy.controlloop.actorserviceprovider.Util.ident;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
+import java.util.function.Supplier;
import lombok.NoArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Pipeline controller, used by operations within the pipeline to determine if they should
- * continue to run. If {@link #cancel(boolean)} is invoked, it automatically stops the
- * pipeline.
+ * continue to run. Whenever this is canceled or completed, it automatically cancels all
+ * futures and runs all listeners that have been added.
*/
@NoArgsConstructor
public class PipelineControllerFuture<T> extends CompletableFuture<T> {
private static final Logger logger = LoggerFactory.getLogger(PipelineControllerFuture.class);
+ private static final String COMPLETE_EXCEPT_MSG = "{}: complete future with exception";
+ private static final String CANCEL_MSG = "{}: cancel future";
+ private static final String COMPLETE_MSG = "{}: complete future";
+
/**
* Tracks items added to this controller via one of the <i>add</i> methods.
*/
private final FutureManager futures = new FutureManager();
- /**
- * Cancels and stops the pipeline, in that order.
- */
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
- try {
- logger.trace("{}: cancel future", ident(this));
- return super.cancel(mayInterruptIfRunning);
+ return doAndStop(() -> super.cancel(mayInterruptIfRunning), CANCEL_MSG, ident(this));
+ }
- } finally {
- futures.stop();
- }
+ @Override
+ public boolean complete(T value) {
+ return doAndStop(() -> super.complete(value), COMPLETE_MSG, ident(this));
+ }
+
+ @Override
+ public boolean completeExceptionally(Throwable ex) {
+ return doAndStop(() -> super.completeExceptionally(ex), COMPLETE_EXCEPT_MSG, ident(this));
+ }
+
+ @Override
+ public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor) {
+ return super.completeAsync(() -> doAndStop(supplier, COMPLETE_MSG, ident(this)), executor);
+ }
+
+ @Override
+ public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) {
+ return super.completeAsync(() -> doAndStop(supplier, COMPLETE_MSG, ident(this)));
+ }
+
+ @Override
+ public CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit) {
+ logger.info("{}: set future timeout to {} {}", ident(this), timeout, unit);
+ return super.completeOnTimeout(value, timeout, unit);
+ }
+
+ @Override
+ public <U> PipelineControllerFuture<U> newIncompleteFuture() {
+ return new PipelineControllerFuture<>();
}
/**
@@ -67,11 +96,8 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> {
*
* @return a function that removes the given future
*/
- public <F> BiConsumer<T, Throwable> delayedRemove(Future<F> future) {
- return (value, thrown) -> {
- logger.trace("{}: remove future {}", ident(this), ident(future));
- remove(future);
- };
+ public <F> BiConsumer<F, Throwable> delayedRemove(Future<F> future) {
+ return (value, thrown) -> remove(future);
}
/**
@@ -81,11 +107,8 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> {
*
* @return a function that removes the given listener
*/
- public BiConsumer<T, Throwable> delayedRemove(Runnable listener) {
- return (value, thrown) -> {
- logger.trace("{}: remove listener {}", ident(this), ident(listener));
- remove(listener);
- };
+ public <F> BiConsumer<F, Throwable> delayedRemove(Runnable listener) {
+ return (value, thrown) -> remove(listener);
}
/**
@@ -98,25 +121,43 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> {
public BiConsumer<T, Throwable> delayedComplete() {
return (value, thrown) -> {
if (thrown == null) {
- logger.trace("{}: complete and stop future", ident(this));
complete(value);
} else {
- logger.trace("{}: complete exceptionally and stop future", ident(this));
completeExceptionally(thrown);
}
-
- futures.stop();
};
}
/**
+ * Adds a future to the controller and arranges for it to be removed from the
+ * controller when it completes, whether or not it throws an exception. If the
+ * controller has already been stopped, then the future is canceled and a new,
+ * incomplete future is returned.
+ *
+ * @param future future to be wrapped
+ * @return a new future
+ */
+ public CompletableFuture<T> wrap(CompletableFuture<T> future) {
+ if (!isRunning()) {
+ logger.trace("{}: not running, skipping next task {}", ident(this), ident(future));
+ future.cancel(false);
+ return new CompletableFuture<>();
+ }
+
+ add(future);
+ return future.whenComplete(this.delayedRemove(future));
+ }
+
+ /**
* Adds a function whose return value is to be canceled when this controller is
* stopped. Note: if the controller is already stopped, then the function will
* <i>not</i> be executed.
*
- * @param futureMaker function to be invoked in the future
+ * @param futureMaker function to be invoked to create the future
+ * @return a function to create the future and arrange for it to be managed by this
+ * controller
*/
- public <F> Function<F, CompletableFuture<F>> add(Function<F, CompletableFuture<F>> futureMaker) {
+ public <F> Function<F, CompletableFuture<F>> wrap(Function<F, CompletableFuture<F>> futureMaker) {
return input -> {
if (!isRunning()) {
@@ -127,7 +168,7 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> {
CompletableFuture<F> future = futureMaker.apply(input);
add(future);
- return future;
+ return future.whenComplete(delayedRemove(future));
};
}
@@ -154,4 +195,26 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> {
logger.trace("{}: remove listener {}", ident(this), ident(listener));
futures.remove(listener);
}
+
+ /**
+ * Performs an operation, stops the futures, and returns the value from the operation.
+ * Logs a message using the given arguments.
+ *
+ *
+ * @param <R> type of value to be returned
+ * @param supplier operation to perform
+ * @param message message to be logged
+ * @param args message arguments to fill "{}" place-holders
+ * @return the operation's result
+ */
+ private <R> R doAndStop(Supplier<R> supplier, String message, Object... args) {
+ try {
+ logger.trace(message, args);
+ return supplier.get();
+
+ } finally {
+ logger.trace("{}: stopping this future", ident(this));
+ futures.stop();
+ }
+ }
}