path: root/models-interactions/model-actors/actorServiceProvider/src/main
diff options
Diffstat (limited to 'models-interactions/model-actors/actorServiceProvider/src/main')
14 files changed, 1274 insertions, 973 deletions
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java
deleted file mode 100644
index d78403809..000000000
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java
+++ /dev/null
@@ -1,119 +0,0 @@
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.policy.controlloop.actorserviceprovider;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-import javax.ws.rs.client.InvocationCallback;
-import lombok.AccessLevel;
-import lombok.Getter;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
-import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
- * Handler for a <i>single</i> asynchronous response.
- *
- * @param <T> response type
- */
-public abstract class AsyncResponseHandler<T> implements InvocationCallback<T> {
- private static final Logger logger = LoggerFactory.getLogger(AsyncResponseHandler.class);
- @Getter(AccessLevel.NONE)
- private final PipelineControllerFuture<OperationOutcome> result = new PipelineControllerFuture<>();
- private final ControlLoopOperationParams params;
- private final OperationOutcome outcome;
- /**
- * Constructs the object.
- *
- * @param params operation parameters
- * @param outcome outcome to be populated based on the response
- */
- public AsyncResponseHandler(ControlLoopOperationParams params, OperationOutcome outcome) {
- this.params = params;
- this.outcome = outcome;
- }
- /**
- * Handles the given future, arranging to cancel it when the response is received.
- *
- * @param future future to be handled
- * @return a future to be used to cancel or wait for the response
- */
- public CompletableFuture<OperationOutcome> handle(Future<T> future) {
- result.add(future);
- return result;
- }
- /**
- * Invokes {@link #doComplete()} and then completes "this" with the returned value.
- */
- @Override
- public void completed(T rawResponse) {
- try {
- logger.trace("{}.{}: response completed for {}", params.getActor(), params.getOperation(),
- params.getRequestId());
- result.complete(doComplete(rawResponse));
- } catch (RuntimeException e) {
- logger.trace("{}.{}: response handler threw an exception for {}", params.getActor(), params.getOperation(),
- params.getRequestId());
- result.completeExceptionally(e);
- }
- }
- /**
- * Invokes {@link #doFailed()} and then completes "this" with the returned value.
- */
- @Override
- public void failed(Throwable throwable) {
- try {
- logger.trace("{}.{}: response failure for {}", params.getActor(), params.getOperation(),
- params.getRequestId());
- result.complete(doFailed(throwable));
- } catch (RuntimeException e) {
- logger.trace("{}.{}: response failure handler threw an exception for {}", params.getActor(),
- params.getOperation(), params.getRequestId());
- result.completeExceptionally(e);
- }
- }
- /**
- * Completes the processing of a response.
- *
- * @param rawResponse raw response that was received
- * @return the outcome
- */
- protected abstract OperationOutcome doComplete(T rawResponse);
- /**
- * Handles a response exception.
- *
- * @param thrown exception that was thrown
- * @return the outcome
- */
- protected abstract OperationOutcome doFailed(Throwable thrown);
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operation.java
new file mode 100644
index 000000000..39977fd41
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operation.java
@@ -0,0 +1,52 @@
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.policy.controlloop.actorserviceprovider;
+import java.util.concurrent.CompletableFuture;
+ * This is the service interface for defining an Actor operation used in Control Loop
+ * Operational Policies for performing actions on runtime entities.
+ */
+public interface Operation {
+ /**
+ * Gets the name of the associated actor.
+ *
+ * @return the name of the associated actor
+ */
+ String getActorName();
+ /**
+ * Gets the name of the operation.
+ *
+ * @return the operation name
+ */
+ String getName();
+ /**
+ * Called by enforcement PDP engine to start the operation. As part of the operation,
+ * it invokes the "start" and "complete" call-backs found within the parameters.
+ *
+ * @return a future that can be used to cancel or await the result of the operation
+ */
+ CompletableFuture<OperationOutcome> start();
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java
index c09460e34..24faafd40 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java
@@ -21,7 +21,6 @@
package org.onap.policy.controlloop.actorserviceprovider;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import org.onap.policy.common.capabilities.Configurable;
import org.onap.policy.common.capabilities.Startable;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
@@ -47,11 +46,10 @@ public interface Operator extends Startable, Configurable<Map<String, Object>> {
String getName();
- * Called by enforcement PDP engine to start the operation. As part of the operation,
- * it invokes the "start" and "complete" call-backs found within the parameters.
+ * Called by enforcement PDP engine to build the operation.
- * @param params parameters needed to start the operation
- * @return a future that can be used to cancel or await the result of the operation
+ * @param params parameters needed by the operation
+ * @return a new operation
- CompletableFuture<OperationOutcome> startOperation(ControlLoopOperationParams params);
+ Operation buildOperation(ControlLoopOperationParams params);
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java
index c3ddd17f3..b885b5c25 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java
@@ -23,9 +23,6 @@ package org.onap.policy.controlloop.actorserviceprovider;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
-import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
import org.onap.policy.common.utils.coder.Coder;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.common.utils.coder.StandardCoder;
@@ -56,82 +53,6 @@ public class Util {
- * Logs a REST request. If the request is not of type, String, then it attempts to
- * pretty-print it into JSON before logging.
- *
- * @param url request URL
- * @param request request to be logged
- */
- public static <T> void logRestRequest(String url, T request) {
- logRestRequest(new StandardCoder(), url, request);
- }
- /**
- * Logs a REST request. If the request is not of type, String, then it attempts to
- * pretty-print it into JSON before logging.
- *
- * @param coder coder to be used to pretty-print the request
- * @param url request URL
- * @param request request to be logged
- */
- protected static <T> void logRestRequest(Coder coder, String url, T request) {
- String json;
- try {
- if (request instanceof String) {
- json = request.toString();
- } else {
- json = coder.encode(request, true);
- }
- } catch (CoderException e) {
- logger.warn("cannot pretty-print request", e);
- json = request.toString();
- }
- NetLoggerUtil.log(EventType.OUT, CommInfrastructure.REST, url, json);
- logger.info("[OUT|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json);
- }
- /**
- * Logs a REST response. If the response is not of type, String, then it attempts to
- * pretty-print it into JSON before logging.
- *
- * @param url request URL
- * @param response response to be logged
- */
- public static <T> void logRestResponse(String url, T response) {
- logRestResponse(new StandardCoder(), url, response);
- }
- /**
- * Logs a REST response. If the request is not of type, String, then it attempts to
- * pretty-print it into JSON before logging.
- *
- * @param coder coder to be used to pretty-print the response
- * @param url request URL
- * @param response response to be logged
- */
- protected static <T> void logRestResponse(Coder coder, String url, T response) {
- String json;
- try {
- if (response == null) {
- json = null;
- } else if (response instanceof String) {
- json = response.toString();
- } else {
- json = coder.encode(response, true);
- }
- } catch (CoderException e) {
- logger.warn("cannot pretty-print response", e);
- json = response.toString();
- }
- NetLoggerUtil.log(EventType.IN, CommInfrastructure.REST, url, json);
- logger.info("[IN|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json);
- }
- /**
* Runs a function and logs a message if it throws an exception. Does <i>not</i>
* re-throw the exception.
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java
index cd4d2570f..1c37a8e0d 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java
@@ -23,12 +23,15 @@ package org.onap.policy.controlloop.actorserviceprovider.controlloop;
import java.io.Serializable;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import org.onap.policy.controlloop.VirtualControlLoopEvent;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
* Context associated with a control loop event.
@@ -47,11 +50,23 @@ public class ControlLoopEventContext implements Serializable {
private final Map<String, String> enrichment;
+ /**
+ * Set of properties that have been stored in the context.
+ */
private Map<String, Serializable> properties = new ConcurrentHashMap<>();
+ * When {@link #obtain(String, ControlLoopOperationParams)} is invoked and the
+ * specified property is not found in {@link #properties}, it is retrieved. This holds
+ * the futures for the operations retrieving the properties.
+ */
+ @Getter(AccessLevel.NONE)
+ @Setter(AccessLevel.NONE)
+ private transient Map<String, CompletableFuture<OperationOutcome>> retrievers = new ConcurrentHashMap<>();
+ /**
* Request ID extracted from the event, or a generated value if the event has no
* request id; never {@code null}.
@@ -100,4 +115,34 @@ public class ControlLoopEventContext implements Serializable {
public void setProperty(String name, Serializable value) {
properties.put(name, value);
+ /**
+ * Obtains the given property.
+ *
+ * @param name name of the desired property
+ * @param params parameters needed to perform the operation to retrieve the desired
+ * property
+ * @return a future for retrieving the property, {@code null} if the property has
+ * already been retrieved
+ */
+ public CompletableFuture<OperationOutcome> obtain(String name, ControlLoopOperationParams params) {
+ if (properties.containsKey(name)) {
+ return null;
+ }
+ CompletableFuture<OperationOutcome> future = retrievers.get(name);
+ if (future != null) {
+ return future;
+ }
+ future = params.start();
+ CompletableFuture<OperationOutcome> oldFuture = retrievers.putIfAbsent(name, future);
+ if (oldFuture != null) {
+ future.cancel(false);
+ return oldFuture;
+ }
+ return future;
+ }
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java
index d7f322e8a..0c88ebee2 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java
@@ -91,7 +91,7 @@ public class ActorImpl extends StartConfigPartial<Map<String, Object>> implement
public Operator getOperator(String name) {
Operator operator = name2operator.get(name);
if (operator == null) {
- throw new IllegalArgumentException("unknown operation " + getName() + "." + name);
+ throw new IllegalArgumentException("unknown operator " + getName() + "." + name);
return operator;
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java
new file mode 100644
index 000000000..c4bf5f484
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java
@@ -0,0 +1,286 @@
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+import javax.ws.rs.client.InvocationCallback;
+import javax.ws.rs.core.Response;
+import lombok.Getter;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.http.client.HttpClient;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams;
+import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
+import org.onap.policy.controlloop.policy.PolicyResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+ * Operator that uses HTTP. The operator's parameters must be an {@link HttpParams}.
+ *
+ * @param <T> response type
+ */
+public abstract class HttpOperation<T> extends OperationPartial {
+ private static final Logger logger = LoggerFactory.getLogger(HttpOperation.class);
+ private static final Coder coder = new StandardCoder();
+ /**
+ * Operator that created this operation.
+ */
+ protected final HttpOperator operator;
+ /**
+ * Response class.
+ */
+ private final Class<T> responseClass;
+ /**
+ * Constructs the object.
+ *
+ * @param params operation parameters
+ * @param operator operator that created this operation
+ * @param clazz response class
+ */
+ public HttpOperation(ControlLoopOperationParams params, HttpOperator operator, Class<T> clazz) {
+ super(params, operator);
+ this.operator = operator;
+ this.responseClass = clazz;
+ }
+ /**
+ * If no timeout is specified, then it returns the operator's configured timeout.
+ */
+ @Override
+ protected long getTimeoutMs(Integer timeoutSec) {
+ return (timeoutSec == null || timeoutSec == 0 ? operator.getTimeoutMs() : super.getTimeoutMs(timeoutSec));
+ }
+ /**
+ * Makes the request headers. This simply returns an empty map.
+ *
+ * @return request headers, a non-null, modifiable map
+ */
+ protected Map<String, Object> makeHeaders() {
+ return new HashMap<>();
+ }
+ /**
+ * Gets the path to be used when performing the request; this is typically appended to
+ * the base URL. This method simply invokes {@link #getPath()}.
+ *
+ * @return the path URI suffix
+ */
+ public String makePath() {
+ return operator.getPath();
+ }
+ /**
+ * Makes the URL to which the "get" request should be posted. This ir primarily used
+ * for logging purposes. This particular method returns the base URL appended with the
+ * return value from {@link #makePath()}.
+ *
+ * @return the URL to which from which to get
+ */
+ public String makeUrl() {
+ return (operator.getClient().getBaseUrl() + makePath());
+ }
+ /**
+ * Arranges to handle a response.
+ *
+ * @param outcome outcome to be populate
+ * @param url URL to which to request was sent
+ * @param requester function to initiate the request and invoke the given callback
+ * when it completes
+ * @return a future for the response
+ */
+ protected CompletableFuture<OperationOutcome> handleResponse(OperationOutcome outcome, String url,
+ Function<InvocationCallback<Response>, Future<Response>> requester) {
+ final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+ final CompletableFuture<Response> future = new CompletableFuture<>();
+ final Executor executor = params.getExecutor();
+ // arrange for the callback to complete "future"
+ InvocationCallback<Response> callback = new InvocationCallback<>() {
+ @Override
+ public void completed(Response response) {
+ future.complete(response);
+ }
+ @Override
+ public void failed(Throwable throwable) {
+ logger.warn("{}.{}: response failure for {}", params.getActor(), params.getOperation(),
+ params.getRequestId());
+ future.completeExceptionally(throwable);
+ }
+ };
+ // start the request and arrange to cancel it if the controller is canceled
+ controller.add(requester.apply(callback));
+ // once "future" completes, process the response, and then complete the controller
+ future.thenApplyAsync(response -> processResponse(outcome, url, response), executor)
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+ return controller;
+ }
+ /**
+ * Processes a response. This method simply sets the outcome to SUCCESS.
+ *
+ * @param outcome outcome to be populate
+ * @param url URL to which to request was sent
+ * @param response raw response to process
+ * @return the outcome
+ */
+ protected OperationOutcome processResponse(OperationOutcome outcome, String url, Response rawResponse) {
+ logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId());
+ String strResponse = HttpClient.getBody(rawResponse, String.class);
+ logRestResponse(url, strResponse);
+ T response;
+ if (responseClass == String.class) {
+ response = responseClass.cast(strResponse);
+ } else {
+ try {
+ response = makeCoder().decode(strResponse, responseClass);
+ } catch (CoderException e) {
+ logger.warn("{}.{} cannot decode response with http error code {} for {}", params.getActor(),
+ params.getOperation(), rawResponse.getStatus(), params.getRequestId(), e);
+ return setOutcome(outcome, PolicyResult.FAILURE_EXCEPTION);
+ }
+ }
+ if (!isSuccess(rawResponse, response)) {
+ logger.info("{}.{} request failed with http error code {} for {}", params.getActor(), params.getOperation(),
+ rawResponse.getStatus(), params.getRequestId());
+ return setOutcome(outcome, PolicyResult.FAILURE);
+ }
+ logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId());
+ setOutcome(outcome, PolicyResult.SUCCESS);
+ postProcessResponse(outcome, url, rawResponse, response);
+ return outcome;
+ }
+ /**
+ * Processes a successful response.
+ *
+ * @param outcome outcome to be populate
+ * @param url URL to which to request was sent
+ * @param rawResponse raw response
+ * @param response decoded response
+ */
+ protected void postProcessResponse(OperationOutcome outcome, String url, Response rawResponse, T response) {
+ // do nothing
+ }
+ /**
+ * Determines if the response indicates success. This method simply checks the HTTP
+ * status code.
+ *
+ * @param rawResponse raw response
+ * @param response decoded response
+ * @return {@code true} if the response indicates success, {@code false} otherwise
+ */
+ protected boolean isSuccess(Response rawResponse, T response) {
+ return (rawResponse.getStatus() == 200);
+ }
+ /**
+ * Logs a REST request. If the request is not of type, String, then it attempts to
+ * pretty-print it into JSON before logging.
+ *
+ * @param url request URL
+ * @param request request to be logged
+ */
+ public <Q> void logRestRequest(String url, Q request) {
+ String json;
+ try {
+ if (request == null) {
+ json = null;
+ } else if (request instanceof String) {
+ json = request.toString();
+ } else {
+ json = makeCoder().encode(request, true);
+ }
+ } catch (CoderException e) {
+ logger.warn("cannot pretty-print request", e);
+ json = request.toString();
+ }
+ NetLoggerUtil.log(EventType.OUT, CommInfrastructure.REST, url, json);
+ logger.info("[OUT|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json);
+ }
+ /**
+ * Logs a REST response. If the response is not of type, String, then it attempts to
+ * pretty-print it into JSON before logging.
+ *
+ * @param url request URL
+ * @param response response to be logged
+ */
+ public <S> void logRestResponse(String url, S response) {
+ String json;
+ try {
+ if (response == null) {
+ json = null;
+ } else if (response instanceof String) {
+ json = response.toString();
+ } else {
+ json = makeCoder().encode(response, true);
+ }
+ } catch (CoderException e) {
+ logger.warn("cannot pretty-print response", e);
+ json = response.toString();
+ }
+ NetLoggerUtil.log(EventType.IN, CommInfrastructure.REST, url, json);
+ logger.info("[IN|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json);
+ }
+ // these may be overridden by junit tests
+ protected Coder makeCoder() {
+ return coder;
+ }
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java
index 566492907..add74aa42 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java
@@ -21,31 +21,35 @@
package org.onap.policy.controlloop.actorserviceprovider.impl;
import java.util.Map;
-import lombok.AccessLevel;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
import lombok.Getter;
import org.onap.policy.common.endpoints.http.client.HttpClient;
import org.onap.policy.common.endpoints.http.client.HttpClientFactory;
import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.controlloop.actorserviceprovider.Operation;
import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
- * Operator that uses HTTP. The operator's parameters must be a {@link HttpParams}.
+ * Operator that uses HTTP. The operator's parameters must be an {@link HttpParams}.
-public class HttpOperator extends OperatorPartial {
+public abstract class HttpOperator extends OperatorPartial {
- @Getter(AccessLevel.PROTECTED)
private HttpClient client;
- @Getter
- private long timeoutSec;
+ /**
+ * Default timeout, in milliseconds, if none specified in the request.
+ */
+ private long timeoutMs;
- * URI path for this particular operation.
+ * URI path for this particular operation. Includes a leading "/".
- @Getter
private String path;
@@ -60,6 +64,26 @@ public class HttpOperator extends OperatorPartial {
+ * Makes an operator that will construct operations.
+ *
+ * @param <T> response type
+ * @param actorName actor name
+ * @param operation operation name
+ * @param operationMaker function to make an operation
+ * @return a new operator
+ */
+ public static <T> HttpOperator makeOperator(String actorName, String operation,
+ BiFunction<ControlLoopOperationParams, HttpOperator, HttpOperation<T>> operationMaker) {
+ return new HttpOperator(actorName, operation) {
+ @Override
+ public Operation buildOperation(ControlLoopOperationParams params) {
+ return operationMaker.apply(params, this);
+ }
+ };
+ }
+ /**
* Translates the parameters to an {@link HttpParams} and then extracts the relevant
* values.
@@ -73,10 +97,10 @@ public class HttpOperator extends OperatorPartial {
client = getClientFactory().get(params.getClientName());
path = params.getPath();
- timeoutSec = params.getTimeoutSec();
+ timeoutMs = TimeUnit.MILLISECONDS.convert(params.getTimeoutSec(), TimeUnit.SECONDS);
- // these may be overridden by junits
+ // these may be overridden by junit tests
protected HttpClientFactory getClientFactory() {
return HttpClientFactoryInstance.getClientFactory();
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java
new file mode 100644
index 000000000..d00b88bb5
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java
@@ -0,0 +1,844 @@
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.onap.policy.controlloop.ControlLoopOperation;
+import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
+import org.onap.policy.controlloop.actorserviceprovider.Operation;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
+import org.onap.policy.controlloop.policy.PolicyResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+ * Partial implementation of an operator. In general, it's preferable that subclasses
+ * would override {@link #startOperationAsync(int, OperationOutcome)
+ * startOperationAsync()}. However, if that proves to be too difficult, then they can
+ * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
+ * if the operation requires any preprocessor steps, the subclass may choose to override
+ * {@link #startPreprocessorAsync()}.
+ * <p/>
+ * The futures returned by the methods within this class can be canceled, and will
+ * propagate the cancellation to any subtasks. Thus it is also expected that any futures
+ * returned by overridden methods will do the same. Of course, if a class overrides
+ * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
+ * be done to cancel that particular operation.
+ */
+public abstract class OperationPartial implements Operation {
+ private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
+ public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
+ // values extracted from the operator
+ private final OperatorPartial operator;
+ /**
+ * Operation parameters.
+ */
+ protected final ControlLoopOperationParams params;
+ /**
+ * Constructs the object.
+ *
+ * @param params operation parameters
+ * @param operator operator that created this operation
+ */
+ public OperationPartial(ControlLoopOperationParams params, OperatorPartial operator) {
+ this.params = params;
+ this.operator = operator;
+ }
+ public Executor getBlockingExecutor() {
+ return operator.getBlockingExecutor();
+ }
+ public String getFullName() {
+ return operator.getFullName();
+ }
+ public String getActorName() {
+ return operator.getActorName();
+ }
+ public String getName() {
+ return operator.getName();
+ }
+ @Override
+ public final CompletableFuture<OperationOutcome> start() {
+ if (!operator.isAlive()) {
+ throw new IllegalStateException("operation is not running: " + getFullName());
+ }
+ // allocate a controller for the entire operation
+ final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+ CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
+ if (preproc == null) {
+ // no preprocessor required - just start the operation
+ return startOperationAttempt(controller, 1);
+ }
+ /*
+ * Do preprocessor first and then, if successful, start the operation. Note:
+ * operations create their own outcome, ignoring the outcome from any previous
+ * steps.
+ *
+ * Wrap the preprocessor to ensure "stop" is propagated to it.
+ */
+ // @formatter:off
+ controller.wrap(preproc)
+ .exceptionally(fromException("preprocessor of operation"))
+ .thenCompose(handlePreprocessorFailure(controller))
+ .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
+ .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
+ // @formatter:on
+ return controller;
+ }
+ /**
+ * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
+ * invokes the call-backs, marks the controller complete, and returns an incomplete
+ * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
+ * received.
+ * <p/>
+ * Assumes that no callbacks have been invoked yet.
+ *
+ * @param controller pipeline controller
+ * @return a function that checks the outcome status and continues, if successful, or
+ * indicates a failure otherwise
+ */
+ private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
+ PipelineControllerFuture<OperationOutcome> controller) {
+ return outcome -> {
+ if (outcome != null && isSuccess(outcome)) {
+ logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
+ return CompletableFuture.completedFuture(outcome);
+ }
+ logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
+ final Executor executor = params.getExecutor();
+ final CallbackManager callbacks = new CallbackManager();
+ // propagate "stop" to the callbacks
+ controller.add(callbacks);
+ final OperationOutcome outcome2 = params.makeOutcome();
+ // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
+ outcome2.setResult(PolicyResult.FAILURE_GUARD);
+ outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
+ // @formatter:off
+ CompletableFuture.completedFuture(outcome2)
+ .whenCompleteAsync(callbackStarted(callbacks), executor)
+ .whenCompleteAsync(callbackCompleted(callbacks), executor)
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+ // @formatter:on
+ return new CompletableFuture<>();
+ };
+ }
+ /**
+ * Invokes the operation's preprocessor step(s) as a "future". This method simply
+ * invokes {@link #startGuardAsync()}.
+ * <p/>
+ * This method assumes the following:
+ * <ul>
+ * <li>the operator is alive</li>
+ * <li>exceptions generated within the pipeline will be handled by the invoker</li>
+ * </ul>
+ *
+ * @return a function that will start the preprocessor and returns its outcome, or
+ * {@code null} if this operation needs no preprocessor
+ */
+ protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
+ return startGuardAsync();
+ }
+ /**
+ * Invokes the operation's guard step(s) as a "future". This method simply returns
+ * {@code null}.
+ * <p/>
+ * This method assumes the following:
+ * <ul>
+ * <li>the operator is alive</li>
+ * <li>exceptions generated within the pipeline will be handled by the invoker</li>
+ * </ul>
+ *
+ * @return a function that will start the guard checks and returns its outcome, or
+ * {@code null} if this operation has no guard
+ */
+ protected CompletableFuture<OperationOutcome> startGuardAsync() {
+ return null;
+ }
+ /**
+ * Starts the operation attempt, with no preprocessor. When all retries complete, it
+ * will complete the controller.
+ *
+ * @param controller controller for all operation attempts
+ * @param attempt attempt number, typically starting with 1
+ * @return a future that will return the final result of all attempts
+ */
+ private CompletableFuture<OperationOutcome> startOperationAttempt(
+ PipelineControllerFuture<OperationOutcome> controller, int attempt) {
+ // propagate "stop" to the operation attempt
+ controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
+ .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
+ return controller;
+ }
+ /**
+ * Starts the operation attempt, without doing any retries.
+ *
+ * @param params operation parameters
+ * @param attempt attempt number, typically starting with 1
+ * @return a future that will return the result of a single operation attempt
+ */
+ private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
+ logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
+ final Executor executor = params.getExecutor();
+ final OperationOutcome outcome = params.makeOutcome();
+ final CallbackManager callbacks = new CallbackManager();
+ // this operation attempt gets its own controller
+ final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+ // propagate "stop" to the callbacks
+ controller.add(callbacks);
+ // @formatter:off
+ CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
+ .whenCompleteAsync(callbackStarted(callbacks), executor)
+ .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
+ // @formatter:on
+ // handle timeouts, if specified
+ long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
+ if (timeoutMillis > 0) {
+ logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
+ future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
+ }
+ /*
+ * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
+ * before callbackCompleted() is invoked.
+ *
+ * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
+ * the pipeline as the last step anyway.
+ */
+ // @formatter:off
+ future.exceptionally(fromException("operation"))
+ .thenApply(setRetryFlag(attempt))
+ .whenCompleteAsync(callbackStarted(callbacks), executor)
+ .whenCompleteAsync(callbackCompleted(callbacks), executor)
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+ // @formatter:on
+ return controller;
+ }
+ /**
+ * Determines if the outcome was successful.
+ *
+ * @param outcome outcome to examine
+ * @return {@code true} if the outcome was successful
+ */
+ protected boolean isSuccess(OperationOutcome outcome) {
+ return (outcome.getResult() == PolicyResult.SUCCESS);
+ }
+ /**
+ * Determines if the outcome was a failure for this operator.
+ *
+ * @param outcome outcome to examine, or {@code null}
+ * @return {@code true} if the outcome is not {@code null} and was a failure
+ * <i>and</i> was associated with this operator, {@code false} otherwise
+ */
+ protected boolean isActorFailed(OperationOutcome outcome) {
+ return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
+ }
+ /**
+ * Determines if the given outcome is for this operation.
+ *
+ * @param outcome outcome to examine
+ * @return {@code true} if the outcome is for this operation, {@code false} otherwise
+ */
+ protected boolean isSameOperation(OperationOutcome outcome) {
+ return OperationOutcome.isFor(outcome, getActorName(), getName());
+ }
+ /**
+ * Invokes the operation as a "future". This method simply invokes
+ * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
+ * returning the result via a "future".
+ * <p/>
+ * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
+ * the executor in the "params", as that may bring the background thread pool to a
+ * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
+ * instead.
+ * <p/>
+ * This method assumes the following:
+ * <ul>
+ * <li>the operator is alive</li>
+ * <li>verifyRunning() has been invoked</li>
+ * <li>callbackStarted() has been invoked</li>
+ * <li>the invoker will perform appropriate timeout checks</li>
+ * <li>exceptions generated within the pipeline will be handled by the invoker</li>
+ * </ul>
+ *
+ * @param attempt attempt number, typically starting with 1
+ * @return a function that will start the operation and return its result when
+ * complete
+ */
+ protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
+ return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
+ }
+ /**
+ * Low-level method that performs the operation. This can make the same assumptions
+ * that are made by {@link #doOperationAsFuture()}. This particular method simply
+ * throws an {@link UnsupportedOperationException}.
+ *
+ * @param attempt attempt number, typically starting with 1
+ * @param operation the operation being performed
+ * @return the outcome of the operation
+ */
+ protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
+ throw new UnsupportedOperationException("start operation " + getFullName());
+ }
+ /**
+ * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
+ * FAILURE, assuming the policy specifies retries and the retry count has been
+ * exhausted.
+ *
+ * @param attempt latest attempt number, starting with 1
+ * @return a function to get the next future to execute
+ */
+ private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
+ return operation -> {
+ if (operation != null && !isActorFailed(operation)) {
+ /*
+ * wrong type or wrong operation - just leave it as is. No need to log
+ * anything here, as retryOnFailure() will log a message
+ */
+ return operation;
+ }
+ // get a non-null operation
+ OperationOutcome oper2;
+ if (operation != null) {
+ oper2 = operation;
+ } else {
+ oper2 = params.makeOutcome();
+ oper2.setResult(PolicyResult.FAILURE);
+ }
+ int retry = getRetry(params.getRetry());
+ if (retry > 0 && attempt > retry) {
+ /*
+ * retries were specified and we've already tried them all - change to
+ */
+ logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
+ oper2.setResult(PolicyResult.FAILURE_RETRIES);
+ }
+ return oper2;
+ };
+ }
+ /**
+ * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
+ * was previously invoked, and thus that the "operation" is not {@code null}.
+ *
+ * @param controller controller for all of the retries
+ * @param attempt latest attempt number, starting with 1
+ * @return a function to get the next future to execute
+ */
+ private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
+ PipelineControllerFuture<OperationOutcome> controller, int attempt) {
+ return operation -> {
+ if (!isActorFailed(operation)) {
+ // wrong type or wrong operation - just leave it as is
+ logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
+ controller.complete(operation);
+ return new CompletableFuture<>();
+ }
+ if (getRetry(params.getRetry()) <= 0) {
+ // no retries - already marked as FAILURE, so just return it
+ logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
+ controller.complete(operation);
+ return new CompletableFuture<>();
+ }
+ /*
+ * Retry the operation.
+ */
+ long waitMs = getRetryWaitMs();
+ logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
+ return sleep(waitMs, TimeUnit.MILLISECONDS)
+ .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
+ };
+ }
+ /**
+ * Convenience method that starts a sleep(), running via a future.
+ *
+ * @param sleepTime time to sleep
+ * @param unit time unit
+ * @return a future that will complete when the sleep completes
+ */
+ protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
+ if (sleepTime <= 0) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
+ }
+ /**
+ * Converts an exception into an operation outcome, returning a copy of the outcome to
+ * prevent background jobs from changing it.
+ *
+ * @param type type of item throwing the exception
+ * @return a function that will convert an exception into an operation outcome
+ */
+ private Function<Throwable, OperationOutcome> fromException(String type) {
+ return thrown -> {
+ OperationOutcome outcome = params.makeOutcome();
+ logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
+ params.getRequestId(), thrown);
+ return setOutcome(outcome, thrown);
+ };
+ }
+ /**
+ * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
+ * any outstanding futures when one completes.
+ *
+ * @param futures futures for which to wait
+ * @return a future to cancel or await an outcome. If this future is canceled, then
+ * all of the futures will be canceled
+ */
+ protected CompletableFuture<OperationOutcome> anyOf(List<CompletableFuture<OperationOutcome>> futures) {
+ // convert list to an array
+ @SuppressWarnings("rawtypes")
+ CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
+ @SuppressWarnings("unchecked")
+ CompletableFuture<OperationOutcome> result = anyOf(arrFutures);
+ return result;
+ }
+ /**
+ * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any
+ * outstanding futures when one completes.
+ *
+ * @param futures futures for which to wait
+ * @return a future to cancel or await an outcome. If this future is canceled, then
+ * all of the futures will be canceled
+ */
+ protected CompletableFuture<OperationOutcome> anyOf(
+ @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+ if (futures.length == 1) {
+ return futures[0];
+ }
+ final Executor executor = params.getExecutor();
+ final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+ attachFutures(controller, futures);
+ // @formatter:off
+ CompletableFuture.anyOf(futures)
+ .thenApply(object -> (OperationOutcome) object)
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+ // @formatter:on
+ return controller;
+ }
+ /**
+ * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels
+ * the futures if returned future is canceled. The future returns the "worst" outcome,
+ * based on priority (see {@link #detmPriority(OperationOutcome)}).
+ *
+ * @param futures futures for which to wait
+ * @return a future to cancel or await an outcome. If this future is canceled, then
+ * all of the futures will be canceled
+ */
+ protected CompletableFuture<OperationOutcome> allOf(List<CompletableFuture<OperationOutcome>> futures) {
+ // convert list to an array
+ @SuppressWarnings("rawtypes")
+ CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
+ @SuppressWarnings("unchecked")
+ CompletableFuture<OperationOutcome> result = allOf(arrFutures);
+ return result;
+ }
+ /**
+ * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the
+ * futures if returned future is canceled. The future returns the "worst" outcome,
+ * based on priority (see {@link #detmPriority(OperationOutcome)}).
+ *
+ * @param futures futures for which to wait
+ * @return a future to cancel or await an outcome. If this future is canceled, then
+ * all of the futures will be canceled
+ */
+ protected CompletableFuture<OperationOutcome> allOf(
+ @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+ if (futures.length == 1) {
+ return futures[0];
+ }
+ final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+ attachFutures(controller, futures);
+ OperationOutcome[] outcomes = new OperationOutcome[futures.length];
+ @SuppressWarnings("rawtypes")
+ CompletableFuture[] futures2 = new CompletableFuture[futures.length];
+ // record the outcomes of each future when it completes
+ for (int count = 0; count < futures2.length; ++count) {
+ final int count2 = count;
+ futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2);
+ }
+ // @formatter:off
+ CompletableFuture.allOf(futures2)
+ .thenApply(unused -> combineOutcomes(outcomes))
+ .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
+ // @formatter:on
+ return controller;
+ }
+ /**
+ * Attaches the given futures to the controller.
+ *
+ * @param controller master controller for all of the futures
+ * @param futures futures to be attached to the controller
+ */
+ private void attachFutures(PipelineControllerFuture<OperationOutcome> controller,
+ @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+ if (futures.length == 0) {
+ throw new IllegalArgumentException("empty list of futures");
+ }
+ // attach each task
+ for (CompletableFuture<OperationOutcome> future : futures) {
+ controller.add(future);
+ }
+ }
+ /**
+ * Combines the outcomes from a set of tasks.
+ *
+ * @param outcomes outcomes to be examined
+ * @return the combined outcome
+ */
+ private OperationOutcome combineOutcomes(OperationOutcome[] outcomes) {
+ // identify the outcome with the highest priority
+ OperationOutcome outcome = outcomes[0];
+ int priority = detmPriority(outcome);
+ // start with "1", as we've already dealt with "0"
+ for (int count = 1; count < outcomes.length; ++count) {
+ OperationOutcome outcome2 = outcomes[count];
+ int priority2 = detmPriority(outcome2);
+ if (priority2 > priority) {
+ outcome = outcome2;
+ priority = priority2;
+ }
+ }
+ logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
+ (outcome == null ? null : outcome.getResult()), params.getRequestId());
+ return outcome;
+ }
+ /**
+ * Determines the priority of an outcome based on its result.
+ *
+ * @param outcome outcome to examine, or {@code null}
+ * @return the outcome's priority
+ */
+ protected int detmPriority(OperationOutcome outcome) {
+ if (outcome == null || outcome.getResult() == null) {
+ return 1;
+ }
+ switch (outcome.getResult()) {
+ case SUCCESS:
+ return 0;
+ return 2;
+ return 3;
+ case FAILURE:
+ return 4;
+ return 5;
+ default:
+ return 6;
+ }
+ }
+ /**
+ * Performs a task, after verifying that the controller is still running. Also checks
+ * that the previous outcome was successful, if specified.
+ *
+ * @param controller overall pipeline controller
+ * @param checkSuccess {@code true} to check the previous outcome, {@code false}
+ * otherwise
+ * @param outcome outcome of the previous task
+ * @param task task to be performed
+ * @return the task, if everything checks out. Otherwise, it returns an incomplete
+ * future and completes the controller instead
+ */
+ // @formatter:off
+ protected CompletableFuture<OperationOutcome> doTask(
+ PipelineControllerFuture<OperationOutcome> controller,
+ boolean checkSuccess, OperationOutcome outcome,
+ CompletableFuture<OperationOutcome> task) {
+ // @formatter:on
+ if (checkSuccess && !isSuccess(outcome)) {
+ /*
+ * must complete before canceling so that cancel() doesn't cause controller to
+ * complete
+ */
+ controller.complete(outcome);
+ task.cancel(false);
+ return new CompletableFuture<>();
+ }
+ return controller.wrap(task);
+ }
+ /**
+ * Performs a task, after verifying that the controller is still running. Also checks
+ * that the previous outcome was successful, if specified.
+ *
+ * @param controller overall pipeline controller
+ * @param checkSuccess {@code true} to check the previous outcome, {@code false}
+ * otherwise
+ * @param task function to start the task to be performed
+ * @return a function to perform the task. If everything checks out, then it returns
+ * the task. Otherwise, it returns an incomplete future and completes the
+ * controller instead
+ */
+ // @formatter:off
+ protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask(
+ PipelineControllerFuture<OperationOutcome> controller,
+ boolean checkSuccess,
+ Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) {
+ // @formatter:on
+ return outcome -> {
+ if (!controller.isRunning()) {
+ return new CompletableFuture<>();
+ }
+ if (checkSuccess && !isSuccess(outcome)) {
+ controller.complete(outcome);
+ return new CompletableFuture<>();
+ }
+ return controller.wrap(task.apply(outcome));
+ };
+ }
+ /**
+ * Sets the start time of the operation and invokes the callback to indicate that the
+ * operation has started. Does nothing if the pipeline has been stopped.
+ * <p/>
+ * This assumes that the "outcome" is not {@code null}.
+ *
+ * @param callbacks used to determine if the start callback can be invoked
+ * @return a function that sets the start time and invokes the callback
+ */
+ private BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
+ return (outcome, thrown) -> {
+ if (callbacks.canStart()) {
+ // haven't invoked "start" callback yet
+ outcome.setStart(callbacks.getStartTime());
+ outcome.setEnd(null);
+ params.callbackStarted(outcome);
+ }
+ };
+ }
+ /**
+ * Sets the end time of the operation and invokes the callback to indicate that the
+ * operation has completed. Does nothing if the pipeline has been stopped.
+ * <p/>
+ * This assumes that the "outcome" is not {@code null}.
+ * <p/>
+ * Note: the start time must be a reference rather than a plain value, because it's
+ * value must be gotten on-demand, when the returned function is executed at a later
+ * time.
+ *
+ * @param callbacks used to determine if the end callback can be invoked
+ * @return a function that sets the end time and invokes the callback
+ */
+ private BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
+ return (outcome, thrown) -> {
+ if (callbacks.canEnd()) {
+ outcome.setStart(callbacks.getStartTime());
+ outcome.setEnd(callbacks.getEndTime());
+ params.callbackCompleted(outcome);
+ }
+ };
+ }
+ /**
+ * Sets an operation's outcome and message, based on a throwable.
+ *
+ * @param operation operation to be updated
+ * @return the updated operation
+ */
+ protected OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
+ PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
+ return setOutcome(operation, result);
+ }
+ /**
+ * Sets an operation's outcome and default message based on the result.
+ *
+ * @param operation operation to be updated
+ * @param result result of the operation
+ * @return the updated operation
+ */
+ public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
+ logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
+ operation.setResult(result);
+ operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
+ : ControlLoopOperation.FAILED_MSG);
+ return operation;
+ }
+ /**
+ * Determines if a throwable is due to a timeout.
+ *
+ * @param thrown throwable of interest
+ * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
+ */
+ protected boolean isTimeout(Throwable thrown) {
+ if (thrown instanceof CompletionException) {
+ thrown = thrown.getCause();
+ }
+ return (thrown instanceof TimeoutException);
+ }
+ // these may be overridden by subclasses or junit tests
+ /**
+ * Gets the retry count.
+ *
+ * @param retry retry, extracted from the parameters, or {@code null}
+ * @return the number of retries, or {@code 0} if no retries were specified
+ */
+ protected int getRetry(Integer retry) {
+ return (retry == null ? 0 : retry);
+ }
+ /**
+ * Gets the retry wait, in milliseconds.
+ *
+ * @return the retry wait, in milliseconds
+ */
+ protected long getRetryWaitMs() {
+ }
+ /**
+ * Gets the operation timeout.
+ *
+ * @param timeoutSec timeout, in seconds, extracted from the parameters, or
+ * {@code null}
+ * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
+ * specified
+ */
+ protected long getTimeoutMs(Integer timeoutSec) {
+ return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
+ }
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java
index df5258d71..3e15c1be4 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java
@@ -20,57 +20,24 @@
package org.onap.policy.controlloop.actorserviceprovider.impl;
-import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.BiConsumer;
-import java.util.function.Function;
-import lombok.AccessLevel;
import lombok.Getter;
-import lombok.Setter;
-import org.onap.policy.controlloop.ControlLoopOperation;
-import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
-import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
import org.onap.policy.controlloop.actorserviceprovider.Operator;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
-import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
-import org.onap.policy.controlloop.policy.PolicyResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
- * Partial implementation of an operator. In general, it's preferable that subclasses
- * would override
- * {@link #startOperationAsync(ControlLoopOperationParams, int, OperationOutcome)
- * startOperationAsync()}. However, if that proves to be too difficult, then they can
- * simply override {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome)
- * doOperation()}. In addition, if the operation requires any preprocessor steps, the
- * subclass may choose to override
- * {@link #startPreprocessorAsync(ControlLoopOperationParams) startPreprocessorAsync()}.
- * <p/>
- * The futures returned by the methods within this class can be canceled, and will
- * propagate the cancellation to any subtasks. Thus it is also expected that any futures
- * returned by overridden methods will do the same. Of course, if a class overrides
- * {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome) doOperation()},
- * then there's little that can be done to cancel that particular operation.
+ * Partial implementation of an operator.
public abstract class OperatorPartial extends StartConfigPartial<Map<String, Object>> implements Operator {
- private static final Logger logger = LoggerFactory.getLogger(OperatorPartial.class);
* Executor to be used for tasks that may perform blocking I/O. The default executor
* simply launches a new thread for each command that is submitted to it.
* <p/>
- * May be overridden by junit tests.
+ * The "get" method may be overridden by junit tests.
- @Getter(AccessLevel.PROTECTED)
- @Setter(AccessLevel.PROTECTED)
- private Executor blockingExecutor = command -> {
+ @Getter
+ private final Executor blockingExecutor = command -> {
Thread thread = new Thread(command);
@@ -125,721 +92,4 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
protected void doShutdown() {
// do nothing
- @Override
- public final CompletableFuture<OperationOutcome> startOperation(ControlLoopOperationParams params) {
- if (!isAlive()) {
- throw new IllegalStateException("operation is not running: " + getFullName());
- }
- // allocate a controller for the entire operation
- final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
- CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync(params);
- if (preproc == null) {
- // no preprocessor required - just start the operation
- return startOperationAttempt(params, controller, 1);
- }
- /*
- * Do preprocessor first and then, if successful, start the operation. Note:
- * operations create their own outcome, ignoring the outcome from any previous
- * steps.
- *
- * Wrap the preprocessor to ensure "stop" is propagated to it.
- */
- // @formatter:off
- controller.wrap(preproc)
- .exceptionally(fromException(params, "preprocessor of operation"))
- .thenCompose(handlePreprocessorFailure(params, controller))
- .thenCompose(unusedOutcome -> startOperationAttempt(params, controller, 1));
- // @formatter:on
- return controller;
- }
- /**
- * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
- * invokes the call-backs, marks the controller complete, and returns an incomplete
- * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
- * received.
- * <p/>
- * Assumes that no callbacks have been invoked yet.
- *
- * @param params operation parameters
- * @param controller pipeline controller
- * @return a function that checks the outcome status and continues, if successful, or
- * indicates a failure otherwise
- */
- private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
- ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller) {
- return outcome -> {
- if (outcome != null && isSuccess(outcome)) {
- logger.trace("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
- return CompletableFuture.completedFuture(outcome);
- }
- logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
- final Executor executor = params.getExecutor();
- final CallbackManager callbacks = new CallbackManager();
- // propagate "stop" to the callbacks
- controller.add(callbacks);
- final OperationOutcome outcome2 = params.makeOutcome();
- // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
- outcome2.setResult(PolicyResult.FAILURE_GUARD);
- outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
- // @formatter:off
- CompletableFuture.completedFuture(outcome2)
- .whenCompleteAsync(callbackStarted(params, callbacks), executor)
- .whenCompleteAsync(callbackCompleted(params, callbacks), executor)
- .whenCompleteAsync(controller.delayedComplete(), executor);
- // @formatter:on
- return new CompletableFuture<>();
- };
- }
- /**
- * Invokes the operation's preprocessor step(s) as a "future". This method simply
- * returns {@code null}.
- * <p/>
- * This method assumes the following:
- * <ul>
- * <li>the operator is alive</li>
- * <li>exceptions generated within the pipeline will be handled by the invoker</li>
- * </ul>
- *
- * @param params operation parameters
- * @return a function that will start the preprocessor and returns its outcome, or
- * {@code null} if this operation needs no preprocessor
- */
- protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) {
- return null;
- }
- /**
- * Starts the operation attempt, with no preprocessor. When all retries complete, it
- * will complete the controller.
- *
- * @param params operation parameters
- * @param controller controller for all operation attempts
- * @param attempt attempt number, typically starting with 1
- * @return a future that will return the final result of all attempts
- */
- private CompletableFuture<OperationOutcome> startOperationAttempt(ControlLoopOperationParams params,
- PipelineControllerFuture<OperationOutcome> controller, int attempt) {
- // propagate "stop" to the operation attempt
- controller.wrap(startAttemptWithoutRetries(params, attempt))
- .thenCompose(retryOnFailure(params, controller, attempt));
- return controller;
- }
- /**
- * Starts the operation attempt, without doing any retries.
- *
- * @param params operation parameters
- * @param attempt attempt number, typically starting with 1
- * @return a future that will return the result of a single operation attempt
- */
- private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(ControlLoopOperationParams params,
- int attempt) {
- logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
- final Executor executor = params.getExecutor();
- final OperationOutcome outcome = params.makeOutcome();
- final CallbackManager callbacks = new CallbackManager();
- // this operation attempt gets its own controller
- final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
- // propagate "stop" to the callbacks
- controller.add(callbacks);
- // @formatter:off
- CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
- .whenCompleteAsync(callbackStarted(params, callbacks), executor)
- .thenCompose(controller.wrap(outcome2 -> startOperationAsync(params, attempt, outcome2)));
- // @formatter:on
- // handle timeouts, if specified
- long timeoutMillis = getTimeOutMillis(params.getTimeoutSec());
- if (timeoutMillis > 0) {
- logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
- future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
- }
- /*
- * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
- * before callbackCompleted() is invoked.
- *
- * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
- * the pipeline as the last step anyway.
- */
- // @formatter:off
- future.exceptionally(fromException(params, "operation"))
- .thenApply(setRetryFlag(params, attempt))
- .whenCompleteAsync(callbackStarted(params, callbacks), executor)
- .whenCompleteAsync(callbackCompleted(params, callbacks), executor)
- .whenCompleteAsync(controller.delayedComplete(), executor);
- // @formatter:on
- return controller;
- }
- /**
- * Determines if the outcome was successful.
- *
- * @param outcome outcome to examine
- * @return {@code true} if the outcome was successful
- */
- protected boolean isSuccess(OperationOutcome outcome) {
- return (outcome.getResult() == PolicyResult.SUCCESS);
- }
- /**
- * Determines if the outcome was a failure for this operator.
- *
- * @param outcome outcome to examine, or {@code null}
- * @return {@code true} if the outcome is not {@code null} and was a failure
- * <i>and</i> was associated with this operator, {@code false} otherwise
- */
- protected boolean isActorFailed(OperationOutcome outcome) {
- return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
- }
- /**
- * Determines if the given outcome is for this operation.
- *
- * @param outcome outcome to examine
- * @return {@code true} if the outcome is for this operation, {@code false} otherwise
- */
- protected boolean isSameOperation(OperationOutcome outcome) {
- return OperationOutcome.isFor(outcome, getActorName(), getName());
- }
- /**
- * Invokes the operation as a "future". This method simply invokes
- * {@link #doOperation(ControlLoopOperationParams)} using the {@link #blockingExecutor
- * "blocking executor"}, returning the result via a "future".
- * <p/>
- * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
- * the executor in the "params", as that may bring the background thread pool to a
- * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
- * instead.
- * <p/>
- * This method assumes the following:
- * <ul>
- * <li>the operator is alive</li>
- * <li>verifyRunning() has been invoked</li>
- * <li>callbackStarted() has been invoked</li>
- * <li>the invoker will perform appropriate timeout checks</li>
- * <li>exceptions generated within the pipeline will be handled by the invoker</li>
- * </ul>
- *
- * @param params operation parameters
- * @param attempt attempt number, typically starting with 1
- * @return a function that will start the operation and return its result when
- * complete
- */
- protected CompletableFuture<OperationOutcome> startOperationAsync(ControlLoopOperationParams params, int attempt,
- OperationOutcome outcome) {
- return CompletableFuture.supplyAsync(() -> doOperation(params, attempt, outcome), getBlockingExecutor());
- }
- /**
- * Low-level method that performs the operation. This can make the same assumptions
- * that are made by {@link #doOperationAsFuture(ControlLoopOperationParams)}. This
- * particular method simply throws an {@link UnsupportedOperationException}.
- *
- * @param params operation parameters
- * @param attempt attempt number, typically starting with 1
- * @param operation the operation being performed
- * @return the outcome of the operation
- */
- protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt, OperationOutcome operation) {
- throw new UnsupportedOperationException("start operation " + getFullName());
- }
- /**
- * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
- * FAILURE, assuming the policy specifies retries and the retry count has been
- * exhausted.
- *
- * @param params operation parameters
- * @param attempt latest attempt number, starting with 1
- * @return a function to get the next future to execute
- */
- private Function<OperationOutcome, OperationOutcome> setRetryFlag(ControlLoopOperationParams params, int attempt) {
- return operation -> {
- if (operation != null && !isActorFailed(operation)) {
- /*
- * wrong type or wrong operation - just leave it as is. No need to log
- * anything here, as retryOnFailure() will log a message
- */
- return operation;
- }
- // get a non-null operation
- OperationOutcome oper2;
- if (operation != null) {
- oper2 = operation;
- } else {
- oper2 = params.makeOutcome();
- oper2.setResult(PolicyResult.FAILURE);
- }
- Integer retry = params.getRetry();
- if (retry != null && retry > 0 && attempt > retry) {
- /*
- * retries were specified and we've already tried them all - change to
- */
- logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
- oper2.setResult(PolicyResult.FAILURE_RETRIES);
- }
- return oper2;
- };
- }
- /**
- * Restarts the operation if it was a FAILURE. Assumes that
- * {@link #setRetryFlag(ControlLoopOperationParams, int)} was previously invoked, and
- * thus that the "operation" is not {@code null}.
- *
- * @param params operation parameters
- * @param controller controller for all of the retries
- * @param attempt latest attempt number, starting with 1
- * @return a function to get the next future to execute
- */
- private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
- ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller,
- int attempt) {
- return operation -> {
- if (!isActorFailed(operation)) {
- // wrong type or wrong operation - just leave it as is
- logger.trace("not retrying operation {} for {}", getFullName(), params.getRequestId());
- controller.complete(operation);
- return new CompletableFuture<>();
- }
- Integer retry = params.getRetry();
- if (retry == null || retry <= 0) {
- // no retries - already marked as FAILURE, so just return it
- logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
- controller.complete(operation);
- return new CompletableFuture<>();
- }
- /*
- * Retry the operation.
- */
- logger.info("retry operation {} for {}", getFullName(), params.getRequestId());
- return startOperationAttempt(params, controller, attempt + 1);
- };
- }
- /**
- * Converts an exception into an operation outcome, returning a copy of the outcome to
- * prevent background jobs from changing it.
- *
- * @param params operation parameters
- * @param type type of item throwing the exception
- * @return a function that will convert an exception into an operation outcome
- */
- private Function<Throwable, OperationOutcome> fromException(ControlLoopOperationParams params, String type) {
- return thrown -> {
- OperationOutcome outcome = params.makeOutcome();
- logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
- params.getRequestId(), thrown);
- return setOutcome(params, outcome, thrown);
- };
- }
- /**
- * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
- * any outstanding futures when one completes.
- *
- * @param params operation parameters
- * @param futures futures for which to wait
- * @return a future to cancel or await an outcome. If this future is canceled, then
- * all of the futures will be canceled
- */
- protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params,
- List<CompletableFuture<OperationOutcome>> futures) {
- // convert list to an array
- @SuppressWarnings("rawtypes")
- CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
- @SuppressWarnings("unchecked")
- CompletableFuture<OperationOutcome> result = anyOf(params, arrFutures);
- return result;
- }
- /**
- * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any
- * outstanding futures when one completes.
- *
- * @param params operation parameters
- * @param futures futures for which to wait
- * @return a future to cancel or await an outcome. If this future is canceled, then
- * all of the futures will be canceled
- */
- protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params,
- @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
- final Executor executor = params.getExecutor();
- final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
- attachFutures(controller, futures);
- // @formatter:off
- CompletableFuture.anyOf(futures)
- .thenApply(object -> (OperationOutcome) object)
- .whenCompleteAsync(controller.delayedComplete(), executor);
- // @formatter:on
- return controller;
- }
- /**
- * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels
- * the futures if returned future is canceled. The future returns the "worst" outcome,
- * based on priority (see {@link #detmPriority(OperationOutcome)}).
- *
- * @param params operation parameters
- * @param futures futures for which to wait
- * @return a future to cancel or await an outcome. If this future is canceled, then
- * all of the futures will be canceled
- */
- protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params,
- List<CompletableFuture<OperationOutcome>> futures) {
- // convert list to an array
- @SuppressWarnings("rawtypes")
- CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
- @SuppressWarnings("unchecked")
- CompletableFuture<OperationOutcome> result = allOf(params, arrFutures);
- return result;
- }
- /**
- * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the
- * futures if returned future is canceled. The future returns the "worst" outcome,
- * based on priority (see {@link #detmPriority(OperationOutcome)}).
- *
- * @param params operation parameters
- * @param futures futures for which to wait
- * @return a future to cancel or await an outcome. If this future is canceled, then
- * all of the futures will be canceled
- */
- protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params,
- @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
- final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
- attachFutures(controller, futures);
- OperationOutcome[] outcomes = new OperationOutcome[futures.length];
- @SuppressWarnings("rawtypes")
- CompletableFuture[] futures2 = new CompletableFuture[futures.length];
- // record the outcomes of each future when it completes
- for (int count = 0; count < futures2.length; ++count) {
- final int count2 = count;
- futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2);
- }
- CompletableFuture.allOf(futures2).whenComplete(combineOutcomes(params, controller, outcomes));
- return controller;
- }
- /**
- * Attaches the given futures to the controller.
- *
- * @param controller master controller for all of the futures
- * @param futures futures to be attached to the controller
- */
- private void attachFutures(PipelineControllerFuture<OperationOutcome> controller,
- @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
- // attach each task
- for (CompletableFuture<OperationOutcome> future : futures) {
- controller.add(future);
- }
- }
- /**
- * Combines the outcomes from a set of tasks.
- *
- * @param params operation parameters
- * @param future future to be completed with the combined result
- * @param outcomes outcomes to be examined
- */
- private BiConsumer<Void, Throwable> combineOutcomes(ControlLoopOperationParams params,
- CompletableFuture<OperationOutcome> future, OperationOutcome[] outcomes) {
- return (unused, thrown) -> {
- if (thrown != null) {
- future.completeExceptionally(thrown);
- return;
- }
- // identify the outcome with the highest priority
- OperationOutcome outcome = outcomes[0];
- int priority = detmPriority(outcome);
- // start with "1", as we've already dealt with "0"
- for (int count = 1; count < outcomes.length; ++count) {
- OperationOutcome outcome2 = outcomes[count];
- int priority2 = detmPriority(outcome2);
- if (priority2 > priority) {
- outcome = outcome2;
- priority = priority2;
- }
- }
- logger.trace("{}: combined outcome of tasks is {} for {}", getFullName(),
- (outcome == null ? null : outcome.getResult()), params.getRequestId());
- future.complete(outcome);
- };
- }
- /**
- * Determines the priority of an outcome based on its result.
- *
- * @param outcome outcome to examine, or {@code null}
- * @return the outcome's priority
- */
- protected int detmPriority(OperationOutcome outcome) {
- if (outcome == null) {
- return 1;
- }
- switch (outcome.getResult()) {
- case SUCCESS:
- return 0;
- return 2;
- return 3;
- case FAILURE:
- return 4;
- return 5;
- default:
- return 6;
- }
- }
- /**
- * Performs a task, after verifying that the controller is still running. Also checks
- * that the previous outcome was successful, if specified.
- *
- * @param params operation parameters
- * @param controller overall pipeline controller
- * @param checkSuccess {@code true} to check the previous outcome, {@code false}
- * otherwise
- * @param outcome outcome of the previous task
- * @param tasks tasks to be performed
- * @return a function to perform the task. If everything checks out, then it returns
- * the task's future. Otherwise, it returns an incomplete future and completes
- * the controller instead.
- */
- // @formatter:off
- protected CompletableFuture<OperationOutcome> doTask(ControlLoopOperationParams params,
- PipelineControllerFuture<OperationOutcome> controller,
- boolean checkSuccess, OperationOutcome outcome,
- CompletableFuture<OperationOutcome> task) {
- // @formatter:on
- if (checkSuccess && !isSuccess(outcome)) {
- /*
- * must complete before canceling so that cancel() doesn't cause controller to
- * complete
- */
- controller.complete(outcome);
- task.cancel(false);
- return new CompletableFuture<>();
- }
- return controller.wrap(task);
- }
- /**
- * Performs a task, after verifying that the controller is still running. Also checks
- * that the previous outcome was successful, if specified.
- *
- * @param params operation parameters
- * @param controller overall pipeline controller
- * @param checkSuccess {@code true} to check the previous outcome, {@code false}
- * otherwise
- * @param tasks tasks to be performed
- * @return a function to perform the task. If everything checks out, then it returns
- * the task's future. Otherwise, it returns an incomplete future and completes
- * the controller instead.
- */
- // @formatter:off
- protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask(ControlLoopOperationParams params,
- PipelineControllerFuture<OperationOutcome> controller,
- boolean checkSuccess,
- Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) {
- // @formatter:on
- return outcome -> {
- if (!controller.isRunning()) {
- return new CompletableFuture<>();
- }
- if (checkSuccess && !isSuccess(outcome)) {
- controller.complete(outcome);
- return new CompletableFuture<>();
- }
- return controller.wrap(task.apply(outcome));
- };
- }
- /**
- * Sets the start time of the operation and invokes the callback to indicate that the
- * operation has started. Does nothing if the pipeline has been stopped.
- * <p/>
- * This assumes that the "outcome" is not {@code null}.
- *
- * @param params operation parameters
- * @param callbacks used to determine if the start callback can be invoked
- * @return a function that sets the start time and invokes the callback
- */
- private BiConsumer<OperationOutcome, Throwable> callbackStarted(ControlLoopOperationParams params,
- CallbackManager callbacks) {
- return (outcome, thrown) -> {
- if (callbacks.canStart()) {
- // haven't invoked "start" callback yet
- outcome.setStart(callbacks.getStartTime());
- outcome.setEnd(null);
- params.callbackStarted(outcome);
- }
- };
- }
- /**
- * Sets the end time of the operation and invokes the callback to indicate that the
- * operation has completed. Does nothing if the pipeline has been stopped.
- * <p/>
- * This assumes that the "outcome" is not {@code null}.
- * <p/>
- * Note: the start time must be a reference rather than a plain value, because it's
- * value must be gotten on-demand, when the returned function is executed at a later
- * time.
- *
- * @param params operation parameters
- * @param callbacks used to determine if the end callback can be invoked
- * @return a function that sets the end time and invokes the callback
- */
- private BiConsumer<OperationOutcome, Throwable> callbackCompleted(ControlLoopOperationParams params,
- CallbackManager callbacks) {
- return (outcome, thrown) -> {
- if (callbacks.canEnd()) {
- outcome.setStart(callbacks.getStartTime());
- outcome.setEnd(callbacks.getEndTime());
- params.callbackCompleted(outcome);
- }
- };
- }
- /**
- * Sets an operation's outcome and message, based on a throwable.
- *
- * @param params operation parameters
- * @param operation operation to be updated
- * @return the updated operation
- */
- protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation,
- Throwable thrown) {
- PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
- return setOutcome(params, operation, result);
- }
- /**
- * Sets an operation's outcome and default message based on the result.
- *
- * @param params operation parameters
- * @param operation operation to be updated
- * @param result result of the operation
- * @return the updated operation
- */
- protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation,
- PolicyResult result) {
- logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
- operation.setResult(result);
- operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
- : ControlLoopOperation.FAILED_MSG);
- return operation;
- }
- /**
- * Determines if a throwable is due to a timeout.
- *
- * @param thrown throwable of interest
- * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
- */
- protected boolean isTimeout(Throwable thrown) {
- if (thrown instanceof CompletionException) {
- thrown = thrown.getCause();
- }
- return (thrown instanceof TimeoutException);
- }
- // these may be overridden by junit tests
- /**
- * Gets the operation timeout. Subclasses may override this method to obtain the
- * timeout in some other way (e.g., through configuration properties).
- *
- * @param timeoutSec timeout, in seconds, or {@code null}
- * @return the operation timeout, in milliseconds
- */
- protected long getTimeOutMillis(Integer timeoutSec) {
- return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
- }
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java
index 57fce40d7..925916097 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java
@@ -148,7 +148,8 @@ public class ControlLoopOperationParams {
return actorService
- .startOperation(this);
+ .buildOperation(this)
+ .start();
// @formatter:on
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java
index da4fb4f0c..275c8bc4e 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java
@@ -51,7 +51,7 @@ public class HttpActorParams {
* indicates that it should wait forever. The default is zero.
- private long timeoutSec = 0;
+ private int timeoutSec = 0;
* Maps the operation name to its URI path.
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java
index 695ffe4dd..93711c032 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java
@@ -53,7 +53,7 @@ public class HttpParams {
- private long timeoutSec = 0;
+ private int timeoutSec = 0;
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java
index 620950a3c..53bee5f00 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java
@@ -22,7 +22,6 @@
package org.onap.policy.controlloop.actorserviceprovider.spi;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;