diff options
Diffstat (limited to 'models-interactions/model-actors/actorServiceProvider/src/main')
14 files changed, 1274 insertions, 973 deletions
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java deleted file mode 100644 index d78403809..000000000 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/AsyncResponseHandler.java +++ /dev/null @@ -1,119 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.controlloop.actorserviceprovider; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; -import javax.ws.rs.client.InvocationCallback; -import lombok.AccessLevel; -import lombok.Getter; -import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; -import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Handler for a <i>single</i> asynchronous response. - * - * @param <T> response type - */ -@Getter -public abstract class AsyncResponseHandler<T> implements InvocationCallback<T> { - - private static final Logger logger = LoggerFactory.getLogger(AsyncResponseHandler.class); - - @Getter(AccessLevel.NONE) - private final PipelineControllerFuture<OperationOutcome> result = new PipelineControllerFuture<>(); - private final ControlLoopOperationParams params; - private final OperationOutcome outcome; - - /** - * Constructs the object. - * - * @param params operation parameters - * @param outcome outcome to be populated based on the response - */ - public AsyncResponseHandler(ControlLoopOperationParams params, OperationOutcome outcome) { - this.params = params; - this.outcome = outcome; - } - - /** - * Handles the given future, arranging to cancel it when the response is received. - * - * @param future future to be handled - * @return a future to be used to cancel or wait for the response - */ - public CompletableFuture<OperationOutcome> handle(Future<T> future) { - result.add(future); - return result; - } - - /** - * Invokes {@link #doComplete()} and then completes "this" with the returned value. - */ - @Override - public void completed(T rawResponse) { - try { - logger.trace("{}.{}: response completed for {}", params.getActor(), params.getOperation(), - params.getRequestId()); - result.complete(doComplete(rawResponse)); - - } catch (RuntimeException e) { - logger.trace("{}.{}: response handler threw an exception for {}", params.getActor(), params.getOperation(), - params.getRequestId()); - result.completeExceptionally(e); - } - } - - /** - * Invokes {@link #doFailed()} and then completes "this" with the returned value. - */ - @Override - public void failed(Throwable throwable) { - try { - logger.trace("{}.{}: response failure for {}", params.getActor(), params.getOperation(), - params.getRequestId()); - result.complete(doFailed(throwable)); - - } catch (RuntimeException e) { - logger.trace("{}.{}: response failure handler threw an exception for {}", params.getActor(), - params.getOperation(), params.getRequestId()); - result.completeExceptionally(e); - } - } - - /** - * Completes the processing of a response. - * - * @param rawResponse raw response that was received - * @return the outcome - */ - protected abstract OperationOutcome doComplete(T rawResponse); - - /** - * Handles a response exception. - * - * @param thrown exception that was thrown - * @return the outcome - */ - protected abstract OperationOutcome doFailed(Throwable thrown); -} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operation.java new file mode 100644 index 000000000..39977fd41 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operation.java @@ -0,0 +1,52 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider; + +import java.util.concurrent.CompletableFuture; + +/** + * This is the service interface for defining an Actor operation used in Control Loop + * Operational Policies for performing actions on runtime entities. + */ +public interface Operation { + + /** + * Gets the name of the associated actor. + * + * @return the name of the associated actor + */ + String getActorName(); + + /** + * Gets the name of the operation. + * + * @return the operation name + */ + String getName(); + + /** + * Called by enforcement PDP engine to start the operation. As part of the operation, + * it invokes the "start" and "complete" call-backs found within the parameters. + * + * @return a future that can be used to cancel or await the result of the operation + */ + CompletableFuture<OperationOutcome> start(); +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java index c09460e34..24faafd40 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Operator.java @@ -21,7 +21,6 @@ package org.onap.policy.controlloop.actorserviceprovider; import java.util.Map; -import java.util.concurrent.CompletableFuture; import org.onap.policy.common.capabilities.Configurable; import org.onap.policy.common.capabilities.Startable; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; @@ -47,11 +46,10 @@ public interface Operator extends Startable, Configurable<Map<String, Object>> { String getName(); /** - * Called by enforcement PDP engine to start the operation. As part of the operation, - * it invokes the "start" and "complete" call-backs found within the parameters. + * Called by enforcement PDP engine to build the operation. * - * @param params parameters needed to start the operation - * @return a future that can be used to cancel or await the result of the operation + * @param params parameters needed by the operation + * @return a new operation */ - CompletableFuture<OperationOutcome> startOperation(ControlLoopOperationParams params); + Operation buildOperation(ControlLoopOperationParams params); } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java index c3ddd17f3..b885b5c25 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java @@ -23,9 +23,6 @@ package org.onap.policy.controlloop.actorserviceprovider; import java.util.Arrays; import java.util.LinkedHashMap; import java.util.Map; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.utils.NetLoggerUtil; -import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; import org.onap.policy.common.utils.coder.Coder; import org.onap.policy.common.utils.coder.CoderException; import org.onap.policy.common.utils.coder.StandardCoder; @@ -56,82 +53,6 @@ public class Util { } /** - * Logs a REST request. If the request is not of type, String, then it attempts to - * pretty-print it into JSON before logging. - * - * @param url request URL - * @param request request to be logged - */ - public static <T> void logRestRequest(String url, T request) { - logRestRequest(new StandardCoder(), url, request); - } - - /** - * Logs a REST request. If the request is not of type, String, then it attempts to - * pretty-print it into JSON before logging. - * - * @param coder coder to be used to pretty-print the request - * @param url request URL - * @param request request to be logged - */ - protected static <T> void logRestRequest(Coder coder, String url, T request) { - String json; - try { - if (request instanceof String) { - json = request.toString(); - } else { - json = coder.encode(request, true); - } - - } catch (CoderException e) { - logger.warn("cannot pretty-print request", e); - json = request.toString(); - } - - NetLoggerUtil.log(EventType.OUT, CommInfrastructure.REST, url, json); - logger.info("[OUT|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json); - } - - /** - * Logs a REST response. If the response is not of type, String, then it attempts to - * pretty-print it into JSON before logging. - * - * @param url request URL - * @param response response to be logged - */ - public static <T> void logRestResponse(String url, T response) { - logRestResponse(new StandardCoder(), url, response); - } - - /** - * Logs a REST response. If the request is not of type, String, then it attempts to - * pretty-print it into JSON before logging. - * - * @param coder coder to be used to pretty-print the response - * @param url request URL - * @param response response to be logged - */ - protected static <T> void logRestResponse(Coder coder, String url, T response) { - String json; - try { - if (response == null) { - json = null; - } else if (response instanceof String) { - json = response.toString(); - } else { - json = coder.encode(response, true); - } - - } catch (CoderException e) { - logger.warn("cannot pretty-print response", e); - json = response.toString(); - } - - NetLoggerUtil.log(EventType.IN, CommInfrastructure.REST, url, json); - logger.info("[IN|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json); - } - - /** * Runs a function and logs a message if it throws an exception. Does <i>not</i> * re-throw the exception. * diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java index cd4d2570f..1c37a8e0d 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java @@ -23,12 +23,15 @@ package org.onap.policy.controlloop.actorserviceprovider.controlloop; import java.io.Serializable; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import lombok.AccessLevel; import lombok.Getter; import lombok.NonNull; import lombok.Setter; import org.onap.policy.controlloop.VirtualControlLoopEvent; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; /** * Context associated with a control loop event. @@ -47,11 +50,23 @@ public class ControlLoopEventContext implements Serializable { */ private final Map<String, String> enrichment; + /** + * Set of properties that have been stored in the context. + */ @Getter(AccessLevel.NONE) @Setter(AccessLevel.NONE) private Map<String, Serializable> properties = new ConcurrentHashMap<>(); /** + * When {@link #obtain(String, ControlLoopOperationParams)} is invoked and the + * specified property is not found in {@link #properties}, it is retrieved. This holds + * the futures for the operations retrieving the properties. + */ + @Getter(AccessLevel.NONE) + @Setter(AccessLevel.NONE) + private transient Map<String, CompletableFuture<OperationOutcome>> retrievers = new ConcurrentHashMap<>(); + + /** * Request ID extracted from the event, or a generated value if the event has no * request id; never {@code null}. */ @@ -100,4 +115,34 @@ public class ControlLoopEventContext implements Serializable { public void setProperty(String name, Serializable value) { properties.put(name, value); } + + /** + * Obtains the given property. + * + * @param name name of the desired property + * @param params parameters needed to perform the operation to retrieve the desired + * property + * @return a future for retrieving the property, {@code null} if the property has + * already been retrieved + */ + public CompletableFuture<OperationOutcome> obtain(String name, ControlLoopOperationParams params) { + if (properties.containsKey(name)) { + return null; + } + + CompletableFuture<OperationOutcome> future = retrievers.get(name); + if (future != null) { + return future; + } + + future = params.start(); + + CompletableFuture<OperationOutcome> oldFuture = retrievers.putIfAbsent(name, future); + if (oldFuture != null) { + future.cancel(false); + return oldFuture; + } + + return future; + } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java index d7f322e8a..0c88ebee2 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/ActorImpl.java @@ -91,7 +91,7 @@ public class ActorImpl extends StartConfigPartial<Map<String, Object>> implement public Operator getOperator(String name) { Operator operator = name2operator.get(name); if (operator == null) { - throw new IllegalArgumentException("unknown operation " + getName() + "." + name); + throw new IllegalArgumentException("unknown operator " + getName() + "." + name); } return operator; diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java new file mode 100644 index 000000000..c4bf5f484 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java @@ -0,0 +1,286 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.function.Function; +import javax.ws.rs.client.InvocationCallback; +import javax.ws.rs.core.Response; +import lombok.Getter; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.http.client.HttpClient; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams; +import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; +import org.onap.policy.controlloop.policy.PolicyResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Operator that uses HTTP. The operator's parameters must be an {@link HttpParams}. + * + * @param <T> response type + */ +@Getter +public abstract class HttpOperation<T> extends OperationPartial { + private static final Logger logger = LoggerFactory.getLogger(HttpOperation.class); + private static final Coder coder = new StandardCoder(); + + /** + * Operator that created this operation. + */ + protected final HttpOperator operator; + + /** + * Response class. + */ + private final Class<T> responseClass; + + + /** + * Constructs the object. + * + * @param params operation parameters + * @param operator operator that created this operation + * @param clazz response class + */ + public HttpOperation(ControlLoopOperationParams params, HttpOperator operator, Class<T> clazz) { + super(params, operator); + this.operator = operator; + this.responseClass = clazz; + } + + /** + * If no timeout is specified, then it returns the operator's configured timeout. + */ + @Override + protected long getTimeoutMs(Integer timeoutSec) { + return (timeoutSec == null || timeoutSec == 0 ? operator.getTimeoutMs() : super.getTimeoutMs(timeoutSec)); + } + + /** + * Makes the request headers. This simply returns an empty map. + * + * @return request headers, a non-null, modifiable map + */ + protected Map<String, Object> makeHeaders() { + return new HashMap<>(); + } + + /** + * Gets the path to be used when performing the request; this is typically appended to + * the base URL. This method simply invokes {@link #getPath()}. + * + * @return the path URI suffix + */ + public String makePath() { + return operator.getPath(); + } + + /** + * Makes the URL to which the "get" request should be posted. This ir primarily used + * for logging purposes. This particular method returns the base URL appended with the + * return value from {@link #makePath()}. + * + * @return the URL to which from which to get + */ + public String makeUrl() { + return (operator.getClient().getBaseUrl() + makePath()); + } + + /** + * Arranges to handle a response. + * + * @param outcome outcome to be populate + * @param url URL to which to request was sent + * @param requester function to initiate the request and invoke the given callback + * when it completes + * @return a future for the response + */ + protected CompletableFuture<OperationOutcome> handleResponse(OperationOutcome outcome, String url, + Function<InvocationCallback<Response>, Future<Response>> requester) { + + final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + final CompletableFuture<Response> future = new CompletableFuture<>(); + final Executor executor = params.getExecutor(); + + // arrange for the callback to complete "future" + InvocationCallback<Response> callback = new InvocationCallback<>() { + @Override + public void completed(Response response) { + future.complete(response); + } + + @Override + public void failed(Throwable throwable) { + logger.warn("{}.{}: response failure for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + future.completeExceptionally(throwable); + } + }; + + // start the request and arrange to cancel it if the controller is canceled + controller.add(requester.apply(callback)); + + // once "future" completes, process the response, and then complete the controller + future.thenApplyAsync(response -> processResponse(outcome, url, response), executor) + .whenCompleteAsync(controller.delayedComplete(), executor); + + return controller; + } + + /** + * Processes a response. This method simply sets the outcome to SUCCESS. + * + * @param outcome outcome to be populate + * @param url URL to which to request was sent + * @param response raw response to process + * @return the outcome + */ + protected OperationOutcome processResponse(OperationOutcome outcome, String url, Response rawResponse) { + + logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId()); + + String strResponse = HttpClient.getBody(rawResponse, String.class); + + logRestResponse(url, strResponse); + + T response; + if (responseClass == String.class) { + response = responseClass.cast(strResponse); + + } else { + try { + response = makeCoder().decode(strResponse, responseClass); + } catch (CoderException e) { + logger.warn("{}.{} cannot decode response with http error code {} for {}", params.getActor(), + params.getOperation(), rawResponse.getStatus(), params.getRequestId(), e); + return setOutcome(outcome, PolicyResult.FAILURE_EXCEPTION); + } + } + + if (!isSuccess(rawResponse, response)) { + logger.info("{}.{} request failed with http error code {} for {}", params.getActor(), params.getOperation(), + rawResponse.getStatus(), params.getRequestId()); + return setOutcome(outcome, PolicyResult.FAILURE); + } + + logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId()); + setOutcome(outcome, PolicyResult.SUCCESS); + postProcessResponse(outcome, url, rawResponse, response); + + return outcome; + } + + /** + * Processes a successful response. + * + * @param outcome outcome to be populate + * @param url URL to which to request was sent + * @param rawResponse raw response + * @param response decoded response + */ + protected void postProcessResponse(OperationOutcome outcome, String url, Response rawResponse, T response) { + // do nothing + } + + /** + * Determines if the response indicates success. This method simply checks the HTTP + * status code. + * + * @param rawResponse raw response + * @param response decoded response + * @return {@code true} if the response indicates success, {@code false} otherwise + */ + protected boolean isSuccess(Response rawResponse, T response) { + return (rawResponse.getStatus() == 200); + } + + /** + * Logs a REST request. If the request is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param url request URL + * @param request request to be logged + */ + public <Q> void logRestRequest(String url, Q request) { + String json; + try { + if (request == null) { + json = null; + } else if (request instanceof String) { + json = request.toString(); + } else { + json = makeCoder().encode(request, true); + } + + } catch (CoderException e) { + logger.warn("cannot pretty-print request", e); + json = request.toString(); + } + + NetLoggerUtil.log(EventType.OUT, CommInfrastructure.REST, url, json); + logger.info("[OUT|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json); + } + + /** + * Logs a REST response. If the response is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param url request URL + * @param response response to be logged + */ + public <S> void logRestResponse(String url, S response) { + String json; + try { + if (response == null) { + json = null; + } else if (response instanceof String) { + json = response.toString(); + } else { + json = makeCoder().encode(response, true); + } + + } catch (CoderException e) { + logger.warn("cannot pretty-print response", e); + json = response.toString(); + } + + NetLoggerUtil.log(EventType.IN, CommInfrastructure.REST, url, json); + logger.info("[IN|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json); + } + + // these may be overridden by junit tests + + protected Coder makeCoder() { + return coder; + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java index 566492907..add74aa42 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperator.java @@ -21,31 +21,35 @@ package org.onap.policy.controlloop.actorserviceprovider.impl; import java.util.Map; -import lombok.AccessLevel; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import lombok.Getter; import org.onap.policy.common.endpoints.http.client.HttpClient; import org.onap.policy.common.endpoints.http.client.HttpClientFactory; import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance; import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.controlloop.actorserviceprovider.Operation; import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams; import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; /** - * Operator that uses HTTP. The operator's parameters must be a {@link HttpParams}. + * Operator that uses HTTP. The operator's parameters must be an {@link HttpParams}. */ -public class HttpOperator extends OperatorPartial { +@Getter +public abstract class HttpOperator extends OperatorPartial { - @Getter(AccessLevel.PROTECTED) private HttpClient client; - @Getter - private long timeoutSec; + /** + * Default timeout, in milliseconds, if none specified in the request. + */ + private long timeoutMs; /** - * URI path for this particular operation. + * URI path for this particular operation. Includes a leading "/". */ - @Getter private String path; @@ -60,6 +64,26 @@ public class HttpOperator extends OperatorPartial { } /** + * Makes an operator that will construct operations. + * + * @param <T> response type + * @param actorName actor name + * @param operation operation name + * @param operationMaker function to make an operation + * @return a new operator + */ + public static <T> HttpOperator makeOperator(String actorName, String operation, + BiFunction<ControlLoopOperationParams, HttpOperator, HttpOperation<T>> operationMaker) { + + return new HttpOperator(actorName, operation) { + @Override + public Operation buildOperation(ControlLoopOperationParams params) { + return operationMaker.apply(params, this); + } + }; + } + + /** * Translates the parameters to an {@link HttpParams} and then extracts the relevant * values. */ @@ -73,10 +97,10 @@ public class HttpOperator extends OperatorPartial { client = getClientFactory().get(params.getClientName()); path = params.getPath(); - timeoutSec = params.getTimeoutSec(); + timeoutMs = TimeUnit.MILLISECONDS.convert(params.getTimeoutSec(), TimeUnit.SECONDS); } - // these may be overridden by junits + // these may be overridden by junit tests protected HttpClientFactory getClientFactory() { return HttpClientFactoryInstance.getClientFactory(); diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java new file mode 100644 index 000000000..d00b88bb5 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java @@ -0,0 +1,844 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.actorserviceprovider.CallbackManager; +import org.onap.policy.controlloop.actorserviceprovider.Operation; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; +import org.onap.policy.controlloop.policy.PolicyResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Partial implementation of an operator. In general, it's preferable that subclasses + * would override {@link #startOperationAsync(int, OperationOutcome) + * startOperationAsync()}. However, if that proves to be too difficult, then they can + * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition, + * if the operation requires any preprocessor steps, the subclass may choose to override + * {@link #startPreprocessorAsync()}. + * <p/> + * The futures returned by the methods within this class can be canceled, and will + * propagate the cancellation to any subtasks. Thus it is also expected that any futures + * returned by overridden methods will do the same. Of course, if a class overrides + * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can + * be done to cancel that particular operation. + */ +public abstract class OperationPartial implements Operation { + + private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class); + public static final long DEFAULT_RETRY_WAIT_MS = 1000L; + + // values extracted from the operator + + private final OperatorPartial operator; + + /** + * Operation parameters. + */ + protected final ControlLoopOperationParams params; + + + /** + * Constructs the object. + * + * @param params operation parameters + * @param operator operator that created this operation + */ + public OperationPartial(ControlLoopOperationParams params, OperatorPartial operator) { + this.params = params; + this.operator = operator; + } + + public Executor getBlockingExecutor() { + return operator.getBlockingExecutor(); + } + + public String getFullName() { + return operator.getFullName(); + } + + public String getActorName() { + return operator.getActorName(); + } + + public String getName() { + return operator.getName(); + } + + @Override + public final CompletableFuture<OperationOutcome> start() { + if (!operator.isAlive()) { + throw new IllegalStateException("operation is not running: " + getFullName()); + } + + // allocate a controller for the entire operation + final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync(); + if (preproc == null) { + // no preprocessor required - just start the operation + return startOperationAttempt(controller, 1); + } + + /* + * Do preprocessor first and then, if successful, start the operation. Note: + * operations create their own outcome, ignoring the outcome from any previous + * steps. + * + * Wrap the preprocessor to ensure "stop" is propagated to it. + */ + // @formatter:off + controller.wrap(preproc) + .exceptionally(fromException("preprocessor of operation")) + .thenCompose(handlePreprocessorFailure(controller)) + .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1)) + .whenCompleteAsync(controller.delayedComplete(), params.getExecutor()); + // @formatter:on + + return controller; + } + + /** + * Handles a failure in the preprocessor pipeline. If a failure occurred, then it + * invokes the call-backs, marks the controller complete, and returns an incomplete + * future, effectively halting the pipeline. Otherwise, it returns the outcome that it + * received. + * <p/> + * Assumes that no callbacks have been invoked yet. + * + * @param controller pipeline controller + * @return a function that checks the outcome status and continues, if successful, or + * indicates a failure otherwise + */ + private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure( + PipelineControllerFuture<OperationOutcome> controller) { + + return outcome -> { + + if (outcome != null && isSuccess(outcome)) { + logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId()); + return CompletableFuture.completedFuture(outcome); + } + + logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId()); + + final Executor executor = params.getExecutor(); + final CallbackManager callbacks = new CallbackManager(); + + // propagate "stop" to the callbacks + controller.add(callbacks); + + final OperationOutcome outcome2 = params.makeOutcome(); + + // TODO need a FAILURE_MISSING_DATA (e.g., A&AI) + + outcome2.setResult(PolicyResult.FAILURE_GUARD); + outcome2.setMessage(outcome != null ? outcome.getMessage() : null); + + // @formatter:off + CompletableFuture.completedFuture(outcome2) + .whenCompleteAsync(callbackStarted(callbacks), executor) + .whenCompleteAsync(callbackCompleted(callbacks), executor) + .whenCompleteAsync(controller.delayedComplete(), executor); + // @formatter:on + + return new CompletableFuture<>(); + }; + } + + /** + * Invokes the operation's preprocessor step(s) as a "future". This method simply + * invokes {@link #startGuardAsync()}. + * <p/> + * This method assumes the following: + * <ul> + * <li>the operator is alive</li> + * <li>exceptions generated within the pipeline will be handled by the invoker</li> + * </ul> + * + * @return a function that will start the preprocessor and returns its outcome, or + * {@code null} if this operation needs no preprocessor + */ + protected CompletableFuture<OperationOutcome> startPreprocessorAsync() { + return startGuardAsync(); + } + + /** + * Invokes the operation's guard step(s) as a "future". This method simply returns + * {@code null}. + * <p/> + * This method assumes the following: + * <ul> + * <li>the operator is alive</li> + * <li>exceptions generated within the pipeline will be handled by the invoker</li> + * </ul> + * + * @return a function that will start the guard checks and returns its outcome, or + * {@code null} if this operation has no guard + */ + protected CompletableFuture<OperationOutcome> startGuardAsync() { + return null; + } + + /** + * Starts the operation attempt, with no preprocessor. When all retries complete, it + * will complete the controller. + * + * @param controller controller for all operation attempts + * @param attempt attempt number, typically starting with 1 + * @return a future that will return the final result of all attempts + */ + private CompletableFuture<OperationOutcome> startOperationAttempt( + PipelineControllerFuture<OperationOutcome> controller, int attempt) { + + // propagate "stop" to the operation attempt + controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt)) + .whenCompleteAsync(controller.delayedComplete(), params.getExecutor()); + + return controller; + } + + /** + * Starts the operation attempt, without doing any retries. + * + * @param params operation parameters + * @param attempt attempt number, typically starting with 1 + * @return a future that will return the result of a single operation attempt + */ + private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) { + + logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId()); + + final Executor executor = params.getExecutor(); + final OperationOutcome outcome = params.makeOutcome(); + final CallbackManager callbacks = new CallbackManager(); + + // this operation attempt gets its own controller + final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + // propagate "stop" to the callbacks + controller.add(callbacks); + + // @formatter:off + CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome) + .whenCompleteAsync(callbackStarted(callbacks), executor) + .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2))); + // @formatter:on + + // handle timeouts, if specified + long timeoutMillis = getTimeoutMs(params.getTimeoutSec()); + if (timeoutMillis > 0) { + logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId()); + future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS); + } + + /* + * Note: we re-invoke callbackStarted() just to be sure the callback is invoked + * before callbackCompleted() is invoked. + * + * Note: no need to remove "callbacks" from the pipeline, as we're going to stop + * the pipeline as the last step anyway. + */ + + // @formatter:off + future.exceptionally(fromException("operation")) + .thenApply(setRetryFlag(attempt)) + .whenCompleteAsync(callbackStarted(callbacks), executor) + .whenCompleteAsync(callbackCompleted(callbacks), executor) + .whenCompleteAsync(controller.delayedComplete(), executor); + // @formatter:on + + return controller; + } + + /** + * Determines if the outcome was successful. + * + * @param outcome outcome to examine + * @return {@code true} if the outcome was successful + */ + protected boolean isSuccess(OperationOutcome outcome) { + return (outcome.getResult() == PolicyResult.SUCCESS); + } + + /** + * Determines if the outcome was a failure for this operator. + * + * @param outcome outcome to examine, or {@code null} + * @return {@code true} if the outcome is not {@code null} and was a failure + * <i>and</i> was associated with this operator, {@code false} otherwise + */ + protected boolean isActorFailed(OperationOutcome outcome) { + return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE); + } + + /** + * Determines if the given outcome is for this operation. + * + * @param outcome outcome to examine + * @return {@code true} if the outcome is for this operation, {@code false} otherwise + */ + protected boolean isSameOperation(OperationOutcome outcome) { + return OperationOutcome.isFor(outcome, getActorName(), getName()); + } + + /** + * Invokes the operation as a "future". This method simply invokes + * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"}, + * returning the result via a "future". + * <p/> + * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using + * the executor in the "params", as that may bring the background thread pool to a + * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used + * instead. + * <p/> + * This method assumes the following: + * <ul> + * <li>the operator is alive</li> + * <li>verifyRunning() has been invoked</li> + * <li>callbackStarted() has been invoked</li> + * <li>the invoker will perform appropriate timeout checks</li> + * <li>exceptions generated within the pipeline will be handled by the invoker</li> + * </ul> + * + * @param attempt attempt number, typically starting with 1 + * @return a function that will start the operation and return its result when + * complete + */ + protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) { + + return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor()); + } + + /** + * Low-level method that performs the operation. This can make the same assumptions + * that are made by {@link #doOperationAsFuture()}. This particular method simply + * throws an {@link UnsupportedOperationException}. + * + * @param attempt attempt number, typically starting with 1 + * @param operation the operation being performed + * @return the outcome of the operation + */ + protected OperationOutcome doOperation(int attempt, OperationOutcome operation) { + + throw new UnsupportedOperationException("start operation " + getFullName()); + } + + /** + * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is + * FAILURE, assuming the policy specifies retries and the retry count has been + * exhausted. + * + * @param attempt latest attempt number, starting with 1 + * @return a function to get the next future to execute + */ + private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) { + + return operation -> { + if (operation != null && !isActorFailed(operation)) { + /* + * wrong type or wrong operation - just leave it as is. No need to log + * anything here, as retryOnFailure() will log a message + */ + return operation; + } + + // get a non-null operation + OperationOutcome oper2; + if (operation != null) { + oper2 = operation; + } else { + oper2 = params.makeOutcome(); + oper2.setResult(PolicyResult.FAILURE); + } + + int retry = getRetry(params.getRetry()); + if (retry > 0 && attempt > retry) { + /* + * retries were specified and we've already tried them all - change to + * FAILURE_RETRIES + */ + logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId()); + oper2.setResult(PolicyResult.FAILURE_RETRIES); + } + + return oper2; + }; + } + + /** + * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)} + * was previously invoked, and thus that the "operation" is not {@code null}. + * + * @param controller controller for all of the retries + * @param attempt latest attempt number, starting with 1 + * @return a function to get the next future to execute + */ + private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure( + PipelineControllerFuture<OperationOutcome> controller, int attempt) { + + return operation -> { + if (!isActorFailed(operation)) { + // wrong type or wrong operation - just leave it as is + logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId()); + controller.complete(operation); + return new CompletableFuture<>(); + } + + if (getRetry(params.getRetry()) <= 0) { + // no retries - already marked as FAILURE, so just return it + logger.info("operation {} no retries for {}", getFullName(), params.getRequestId()); + controller.complete(operation); + return new CompletableFuture<>(); + } + + /* + * Retry the operation. + */ + long waitMs = getRetryWaitMs(); + logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId()); + + return sleep(waitMs, TimeUnit.MILLISECONDS) + .thenCompose(unused -> startOperationAttempt(controller, attempt + 1)); + }; + } + + /** + * Convenience method that starts a sleep(), running via a future. + * + * @param sleepTime time to sleep + * @param unit time unit + * @return a future that will complete when the sleep completes + */ + protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) { + if (sleepTime <= 0) { + return CompletableFuture.completedFuture(null); + } + + return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit); + } + + /** + * Converts an exception into an operation outcome, returning a copy of the outcome to + * prevent background jobs from changing it. + * + * @param type type of item throwing the exception + * @return a function that will convert an exception into an operation outcome + */ + private Function<Throwable, OperationOutcome> fromException(String type) { + + return thrown -> { + OperationOutcome outcome = params.makeOutcome(); + + logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(), + params.getRequestId(), thrown); + + return setOutcome(outcome, thrown); + }; + } + + /** + * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels + * any outstanding futures when one completes. + * + * @param futures futures for which to wait + * @return a future to cancel or await an outcome. If this future is canceled, then + * all of the futures will be canceled + */ + protected CompletableFuture<OperationOutcome> anyOf(List<CompletableFuture<OperationOutcome>> futures) { + + // convert list to an array + @SuppressWarnings("rawtypes") + CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]); + + @SuppressWarnings("unchecked") + CompletableFuture<OperationOutcome> result = anyOf(arrFutures); + return result; + } + + /** + * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any + * outstanding futures when one completes. + * + * @param futures futures for which to wait + * @return a future to cancel or await an outcome. If this future is canceled, then + * all of the futures will be canceled + */ + protected CompletableFuture<OperationOutcome> anyOf( + @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) { + + if (futures.length == 1) { + return futures[0]; + } + + final Executor executor = params.getExecutor(); + final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + attachFutures(controller, futures); + + // @formatter:off + CompletableFuture.anyOf(futures) + .thenApply(object -> (OperationOutcome) object) + .whenCompleteAsync(controller.delayedComplete(), executor); + // @formatter:on + + return controller; + } + + /** + * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels + * the futures if returned future is canceled. The future returns the "worst" outcome, + * based on priority (see {@link #detmPriority(OperationOutcome)}). + * + * @param futures futures for which to wait + * @return a future to cancel or await an outcome. If this future is canceled, then + * all of the futures will be canceled + */ + protected CompletableFuture<OperationOutcome> allOf(List<CompletableFuture<OperationOutcome>> futures) { + + // convert list to an array + @SuppressWarnings("rawtypes") + CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]); + + @SuppressWarnings("unchecked") + CompletableFuture<OperationOutcome> result = allOf(arrFutures); + return result; + } + + /** + * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the + * futures if returned future is canceled. The future returns the "worst" outcome, + * based on priority (see {@link #detmPriority(OperationOutcome)}). + * + * @param futures futures for which to wait + * @return a future to cancel or await an outcome. If this future is canceled, then + * all of the futures will be canceled + */ + protected CompletableFuture<OperationOutcome> allOf( + @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) { + + if (futures.length == 1) { + return futures[0]; + } + + final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + attachFutures(controller, futures); + + OperationOutcome[] outcomes = new OperationOutcome[futures.length]; + + @SuppressWarnings("rawtypes") + CompletableFuture[] futures2 = new CompletableFuture[futures.length]; + + // record the outcomes of each future when it completes + for (int count = 0; count < futures2.length; ++count) { + final int count2 = count; + futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2); + } + + // @formatter:off + CompletableFuture.allOf(futures2) + .thenApply(unused -> combineOutcomes(outcomes)) + .whenCompleteAsync(controller.delayedComplete(), params.getExecutor()); + // @formatter:on + + return controller; + } + + /** + * Attaches the given futures to the controller. + * + * @param controller master controller for all of the futures + * @param futures futures to be attached to the controller + */ + private void attachFutures(PipelineControllerFuture<OperationOutcome> controller, + @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) { + + if (futures.length == 0) { + throw new IllegalArgumentException("empty list of futures"); + } + + // attach each task + for (CompletableFuture<OperationOutcome> future : futures) { + controller.add(future); + } + } + + /** + * Combines the outcomes from a set of tasks. + * + * @param outcomes outcomes to be examined + * @return the combined outcome + */ + private OperationOutcome combineOutcomes(OperationOutcome[] outcomes) { + + // identify the outcome with the highest priority + OperationOutcome outcome = outcomes[0]; + int priority = detmPriority(outcome); + + // start with "1", as we've already dealt with "0" + for (int count = 1; count < outcomes.length; ++count) { + OperationOutcome outcome2 = outcomes[count]; + int priority2 = detmPriority(outcome2); + + if (priority2 > priority) { + outcome = outcome2; + priority = priority2; + } + } + + logger.info("{}: combined outcome of tasks is {} for {}", getFullName(), + (outcome == null ? null : outcome.getResult()), params.getRequestId()); + + return outcome; + } + + /** + * Determines the priority of an outcome based on its result. + * + * @param outcome outcome to examine, or {@code null} + * @return the outcome's priority + */ + protected int detmPriority(OperationOutcome outcome) { + if (outcome == null || outcome.getResult() == null) { + return 1; + } + + switch (outcome.getResult()) { + case SUCCESS: + return 0; + + case FAILURE_GUARD: + return 2; + + case FAILURE_RETRIES: + return 3; + + case FAILURE: + return 4; + + case FAILURE_TIMEOUT: + return 5; + + case FAILURE_EXCEPTION: + default: + return 6; + } + } + + /** + * Performs a task, after verifying that the controller is still running. Also checks + * that the previous outcome was successful, if specified. + * + * @param controller overall pipeline controller + * @param checkSuccess {@code true} to check the previous outcome, {@code false} + * otherwise + * @param outcome outcome of the previous task + * @param task task to be performed + * @return the task, if everything checks out. Otherwise, it returns an incomplete + * future and completes the controller instead + */ + // @formatter:off + protected CompletableFuture<OperationOutcome> doTask( + PipelineControllerFuture<OperationOutcome> controller, + boolean checkSuccess, OperationOutcome outcome, + CompletableFuture<OperationOutcome> task) { + // @formatter:on + + if (checkSuccess && !isSuccess(outcome)) { + /* + * must complete before canceling so that cancel() doesn't cause controller to + * complete + */ + controller.complete(outcome); + task.cancel(false); + return new CompletableFuture<>(); + } + + return controller.wrap(task); + } + + /** + * Performs a task, after verifying that the controller is still running. Also checks + * that the previous outcome was successful, if specified. + * + * @param controller overall pipeline controller + * @param checkSuccess {@code true} to check the previous outcome, {@code false} + * otherwise + * @param task function to start the task to be performed + * @return a function to perform the task. If everything checks out, then it returns + * the task. Otherwise, it returns an incomplete future and completes the + * controller instead + */ + // @formatter:off + protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask( + PipelineControllerFuture<OperationOutcome> controller, + boolean checkSuccess, + Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) { + // @formatter:on + + return outcome -> { + + if (!controller.isRunning()) { + return new CompletableFuture<>(); + } + + if (checkSuccess && !isSuccess(outcome)) { + controller.complete(outcome); + return new CompletableFuture<>(); + } + + return controller.wrap(task.apply(outcome)); + }; + } + + /** + * Sets the start time of the operation and invokes the callback to indicate that the + * operation has started. Does nothing if the pipeline has been stopped. + * <p/> + * This assumes that the "outcome" is not {@code null}. + * + * @param callbacks used to determine if the start callback can be invoked + * @return a function that sets the start time and invokes the callback + */ + private BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) { + + return (outcome, thrown) -> { + + if (callbacks.canStart()) { + // haven't invoked "start" callback yet + outcome.setStart(callbacks.getStartTime()); + outcome.setEnd(null); + params.callbackStarted(outcome); + } + }; + } + + /** + * Sets the end time of the operation and invokes the callback to indicate that the + * operation has completed. Does nothing if the pipeline has been stopped. + * <p/> + * This assumes that the "outcome" is not {@code null}. + * <p/> + * Note: the start time must be a reference rather than a plain value, because it's + * value must be gotten on-demand, when the returned function is executed at a later + * time. + * + * @param callbacks used to determine if the end callback can be invoked + * @return a function that sets the end time and invokes the callback + */ + private BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) { + + return (outcome, thrown) -> { + + if (callbacks.canEnd()) { + outcome.setStart(callbacks.getStartTime()); + outcome.setEnd(callbacks.getEndTime()); + params.callbackCompleted(outcome); + } + }; + } + + /** + * Sets an operation's outcome and message, based on a throwable. + * + * @param operation operation to be updated + * @return the updated operation + */ + protected OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) { + PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION); + return setOutcome(operation, result); + } + + /** + * Sets an operation's outcome and default message based on the result. + * + * @param operation operation to be updated + * @param result result of the operation + * @return the updated operation + */ + public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) { + logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId()); + operation.setResult(result); + operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG + : ControlLoopOperation.FAILED_MSG); + + return operation; + } + + /** + * Determines if a throwable is due to a timeout. + * + * @param thrown throwable of interest + * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise + */ + protected boolean isTimeout(Throwable thrown) { + if (thrown instanceof CompletionException) { + thrown = thrown.getCause(); + } + + return (thrown instanceof TimeoutException); + } + + // these may be overridden by subclasses or junit tests + + /** + * Gets the retry count. + * + * @param retry retry, extracted from the parameters, or {@code null} + * @return the number of retries, or {@code 0} if no retries were specified + */ + protected int getRetry(Integer retry) { + return (retry == null ? 0 : retry); + } + + /** + * Gets the retry wait, in milliseconds. + * + * @return the retry wait, in milliseconds + */ + protected long getRetryWaitMs() { + return DEFAULT_RETRY_WAIT_MS; + } + + /** + * Gets the operation timeout. + * + * @param timeoutSec timeout, in seconds, extracted from the parameters, or + * {@code null} + * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was + * specified + */ + protected long getTimeoutMs(Integer timeoutSec) { + return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS)); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java index df5258d71..3e15c1be4 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperatorPartial.java @@ -20,57 +20,24 @@ package org.onap.policy.controlloop.actorserviceprovider.impl; -import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.BiConsumer; -import java.util.function.Function; -import lombok.AccessLevel; import lombok.Getter; -import lombok.Setter; -import org.onap.policy.controlloop.ControlLoopOperation; -import org.onap.policy.controlloop.actorserviceprovider.CallbackManager; -import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; import org.onap.policy.controlloop.actorserviceprovider.Operator; -import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; -import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; -import org.onap.policy.controlloop.policy.PolicyResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * Partial implementation of an operator. In general, it's preferable that subclasses - * would override - * {@link #startOperationAsync(ControlLoopOperationParams, int, OperationOutcome) - * startOperationAsync()}. However, if that proves to be too difficult, then they can - * simply override {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome) - * doOperation()}. In addition, if the operation requires any preprocessor steps, the - * subclass may choose to override - * {@link #startPreprocessorAsync(ControlLoopOperationParams) startPreprocessorAsync()}. - * <p/> - * The futures returned by the methods within this class can be canceled, and will - * propagate the cancellation to any subtasks. Thus it is also expected that any futures - * returned by overridden methods will do the same. Of course, if a class overrides - * {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome) doOperation()}, - * then there's little that can be done to cancel that particular operation. + * Partial implementation of an operator. */ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Object>> implements Operator { - private static final Logger logger = LoggerFactory.getLogger(OperatorPartial.class); - /** * Executor to be used for tasks that may perform blocking I/O. The default executor * simply launches a new thread for each command that is submitted to it. * <p/> - * May be overridden by junit tests. + * The "get" method may be overridden by junit tests. */ - @Getter(AccessLevel.PROTECTED) - @Setter(AccessLevel.PROTECTED) - private Executor blockingExecutor = command -> { + @Getter + private final Executor blockingExecutor = command -> { Thread thread = new Thread(command); thread.setDaemon(true); thread.start(); @@ -125,721 +92,4 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj protected void doShutdown() { // do nothing } - - @Override - public final CompletableFuture<OperationOutcome> startOperation(ControlLoopOperationParams params) { - if (!isAlive()) { - throw new IllegalStateException("operation is not running: " + getFullName()); - } - - // allocate a controller for the entire operation - final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - - CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync(params); - if (preproc == null) { - // no preprocessor required - just start the operation - return startOperationAttempt(params, controller, 1); - } - - /* - * Do preprocessor first and then, if successful, start the operation. Note: - * operations create their own outcome, ignoring the outcome from any previous - * steps. - * - * Wrap the preprocessor to ensure "stop" is propagated to it. - */ - // @formatter:off - controller.wrap(preproc) - .exceptionally(fromException(params, "preprocessor of operation")) - .thenCompose(handlePreprocessorFailure(params, controller)) - .thenCompose(unusedOutcome -> startOperationAttempt(params, controller, 1)); - // @formatter:on - - return controller; - } - - /** - * Handles a failure in the preprocessor pipeline. If a failure occurred, then it - * invokes the call-backs, marks the controller complete, and returns an incomplete - * future, effectively halting the pipeline. Otherwise, it returns the outcome that it - * received. - * <p/> - * Assumes that no callbacks have been invoked yet. - * - * @param params operation parameters - * @param controller pipeline controller - * @return a function that checks the outcome status and continues, if successful, or - * indicates a failure otherwise - */ - private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure( - ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller) { - - return outcome -> { - - if (outcome != null && isSuccess(outcome)) { - logger.trace("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId()); - return CompletableFuture.completedFuture(outcome); - } - - logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId()); - - final Executor executor = params.getExecutor(); - final CallbackManager callbacks = new CallbackManager(); - - // propagate "stop" to the callbacks - controller.add(callbacks); - - final OperationOutcome outcome2 = params.makeOutcome(); - - // TODO need a FAILURE_MISSING_DATA (e.g., A&AI) - - outcome2.setResult(PolicyResult.FAILURE_GUARD); - outcome2.setMessage(outcome != null ? outcome.getMessage() : null); - - // @formatter:off - CompletableFuture.completedFuture(outcome2) - .whenCompleteAsync(callbackStarted(params, callbacks), executor) - .whenCompleteAsync(callbackCompleted(params, callbacks), executor) - .whenCompleteAsync(controller.delayedComplete(), executor); - // @formatter:on - - return new CompletableFuture<>(); - }; - } - - /** - * Invokes the operation's preprocessor step(s) as a "future". This method simply - * returns {@code null}. - * <p/> - * This method assumes the following: - * <ul> - * <li>the operator is alive</li> - * <li>exceptions generated within the pipeline will be handled by the invoker</li> - * </ul> - * - * @param params operation parameters - * @return a function that will start the preprocessor and returns its outcome, or - * {@code null} if this operation needs no preprocessor - */ - protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) { - return null; - } - - /** - * Starts the operation attempt, with no preprocessor. When all retries complete, it - * will complete the controller. - * - * @param params operation parameters - * @param controller controller for all operation attempts - * @param attempt attempt number, typically starting with 1 - * @return a future that will return the final result of all attempts - */ - private CompletableFuture<OperationOutcome> startOperationAttempt(ControlLoopOperationParams params, - PipelineControllerFuture<OperationOutcome> controller, int attempt) { - - // propagate "stop" to the operation attempt - controller.wrap(startAttemptWithoutRetries(params, attempt)) - .thenCompose(retryOnFailure(params, controller, attempt)); - - return controller; - } - - /** - * Starts the operation attempt, without doing any retries. - * - * @param params operation parameters - * @param attempt attempt number, typically starting with 1 - * @return a future that will return the result of a single operation attempt - */ - private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(ControlLoopOperationParams params, - int attempt) { - - logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId()); - - final Executor executor = params.getExecutor(); - final OperationOutcome outcome = params.makeOutcome(); - final CallbackManager callbacks = new CallbackManager(); - - // this operation attempt gets its own controller - final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - - // propagate "stop" to the callbacks - controller.add(callbacks); - - // @formatter:off - CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome) - .whenCompleteAsync(callbackStarted(params, callbacks), executor) - .thenCompose(controller.wrap(outcome2 -> startOperationAsync(params, attempt, outcome2))); - // @formatter:on - - // handle timeouts, if specified - long timeoutMillis = getTimeOutMillis(params.getTimeoutSec()); - if (timeoutMillis > 0) { - logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId()); - future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS); - } - - /* - * Note: we re-invoke callbackStarted() just to be sure the callback is invoked - * before callbackCompleted() is invoked. - * - * Note: no need to remove "callbacks" from the pipeline, as we're going to stop - * the pipeline as the last step anyway. - */ - - // @formatter:off - future.exceptionally(fromException(params, "operation")) - .thenApply(setRetryFlag(params, attempt)) - .whenCompleteAsync(callbackStarted(params, callbacks), executor) - .whenCompleteAsync(callbackCompleted(params, callbacks), executor) - .whenCompleteAsync(controller.delayedComplete(), executor); - // @formatter:on - - return controller; - } - - /** - * Determines if the outcome was successful. - * - * @param outcome outcome to examine - * @return {@code true} if the outcome was successful - */ - protected boolean isSuccess(OperationOutcome outcome) { - return (outcome.getResult() == PolicyResult.SUCCESS); - } - - /** - * Determines if the outcome was a failure for this operator. - * - * @param outcome outcome to examine, or {@code null} - * @return {@code true} if the outcome is not {@code null} and was a failure - * <i>and</i> was associated with this operator, {@code false} otherwise - */ - protected boolean isActorFailed(OperationOutcome outcome) { - return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE); - } - - /** - * Determines if the given outcome is for this operation. - * - * @param outcome outcome to examine - * @return {@code true} if the outcome is for this operation, {@code false} otherwise - */ - protected boolean isSameOperation(OperationOutcome outcome) { - return OperationOutcome.isFor(outcome, getActorName(), getName()); - } - - /** - * Invokes the operation as a "future". This method simply invokes - * {@link #doOperation(ControlLoopOperationParams)} using the {@link #blockingExecutor - * "blocking executor"}, returning the result via a "future". - * <p/> - * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using - * the executor in the "params", as that may bring the background thread pool to a - * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used - * instead. - * <p/> - * This method assumes the following: - * <ul> - * <li>the operator is alive</li> - * <li>verifyRunning() has been invoked</li> - * <li>callbackStarted() has been invoked</li> - * <li>the invoker will perform appropriate timeout checks</li> - * <li>exceptions generated within the pipeline will be handled by the invoker</li> - * </ul> - * - * @param params operation parameters - * @param attempt attempt number, typically starting with 1 - * @return a function that will start the operation and return its result when - * complete - */ - protected CompletableFuture<OperationOutcome> startOperationAsync(ControlLoopOperationParams params, int attempt, - OperationOutcome outcome) { - - return CompletableFuture.supplyAsync(() -> doOperation(params, attempt, outcome), getBlockingExecutor()); - } - - /** - * Low-level method that performs the operation. This can make the same assumptions - * that are made by {@link #doOperationAsFuture(ControlLoopOperationParams)}. This - * particular method simply throws an {@link UnsupportedOperationException}. - * - * @param params operation parameters - * @param attempt attempt number, typically starting with 1 - * @param operation the operation being performed - * @return the outcome of the operation - */ - protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt, OperationOutcome operation) { - - throw new UnsupportedOperationException("start operation " + getFullName()); - } - - /** - * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is - * FAILURE, assuming the policy specifies retries and the retry count has been - * exhausted. - * - * @param params operation parameters - * @param attempt latest attempt number, starting with 1 - * @return a function to get the next future to execute - */ - private Function<OperationOutcome, OperationOutcome> setRetryFlag(ControlLoopOperationParams params, int attempt) { - - return operation -> { - if (operation != null && !isActorFailed(operation)) { - /* - * wrong type or wrong operation - just leave it as is. No need to log - * anything here, as retryOnFailure() will log a message - */ - return operation; - } - - // get a non-null operation - OperationOutcome oper2; - if (operation != null) { - oper2 = operation; - } else { - oper2 = params.makeOutcome(); - oper2.setResult(PolicyResult.FAILURE); - } - - Integer retry = params.getRetry(); - if (retry != null && retry > 0 && attempt > retry) { - /* - * retries were specified and we've already tried them all - change to - * FAILURE_RETRIES - */ - logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId()); - oper2.setResult(PolicyResult.FAILURE_RETRIES); - } - - return oper2; - }; - } - - /** - * Restarts the operation if it was a FAILURE. Assumes that - * {@link #setRetryFlag(ControlLoopOperationParams, int)} was previously invoked, and - * thus that the "operation" is not {@code null}. - * - * @param params operation parameters - * @param controller controller for all of the retries - * @param attempt latest attempt number, starting with 1 - * @return a function to get the next future to execute - */ - private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure( - ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller, - int attempt) { - - return operation -> { - if (!isActorFailed(operation)) { - // wrong type or wrong operation - just leave it as is - logger.trace("not retrying operation {} for {}", getFullName(), params.getRequestId()); - controller.complete(operation); - return new CompletableFuture<>(); - } - - Integer retry = params.getRetry(); - if (retry == null || retry <= 0) { - // no retries - already marked as FAILURE, so just return it - logger.info("operation {} no retries for {}", getFullName(), params.getRequestId()); - controller.complete(operation); - return new CompletableFuture<>(); - } - - - /* - * Retry the operation. - */ - logger.info("retry operation {} for {}", getFullName(), params.getRequestId()); - - return startOperationAttempt(params, controller, attempt + 1); - }; - } - - /** - * Converts an exception into an operation outcome, returning a copy of the outcome to - * prevent background jobs from changing it. - * - * @param params operation parameters - * @param type type of item throwing the exception - * @return a function that will convert an exception into an operation outcome - */ - private Function<Throwable, OperationOutcome> fromException(ControlLoopOperationParams params, String type) { - - return thrown -> { - OperationOutcome outcome = params.makeOutcome(); - - logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(), - params.getRequestId(), thrown); - - return setOutcome(params, outcome, thrown); - }; - } - - /** - * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels - * any outstanding futures when one completes. - * - * @param params operation parameters - * @param futures futures for which to wait - * @return a future to cancel or await an outcome. If this future is canceled, then - * all of the futures will be canceled - */ - protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params, - List<CompletableFuture<OperationOutcome>> futures) { - - // convert list to an array - @SuppressWarnings("rawtypes") - CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]); - - @SuppressWarnings("unchecked") - CompletableFuture<OperationOutcome> result = anyOf(params, arrFutures); - return result; - } - - /** - * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any - * outstanding futures when one completes. - * - * @param params operation parameters - * @param futures futures for which to wait - * @return a future to cancel or await an outcome. If this future is canceled, then - * all of the futures will be canceled - */ - protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params, - @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) { - - final Executor executor = params.getExecutor(); - final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - - attachFutures(controller, futures); - - // @formatter:off - CompletableFuture.anyOf(futures) - .thenApply(object -> (OperationOutcome) object) - .whenCompleteAsync(controller.delayedComplete(), executor); - // @formatter:on - - return controller; - } - - /** - * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels - * the futures if returned future is canceled. The future returns the "worst" outcome, - * based on priority (see {@link #detmPriority(OperationOutcome)}). - * - * @param params operation parameters - * @param futures futures for which to wait - * @return a future to cancel or await an outcome. If this future is canceled, then - * all of the futures will be canceled - */ - protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params, - List<CompletableFuture<OperationOutcome>> futures) { - - // convert list to an array - @SuppressWarnings("rawtypes") - CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]); - - @SuppressWarnings("unchecked") - CompletableFuture<OperationOutcome> result = allOf(params, arrFutures); - return result; - } - - /** - * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the - * futures if returned future is canceled. The future returns the "worst" outcome, - * based on priority (see {@link #detmPriority(OperationOutcome)}). - * - * @param params operation parameters - * @param futures futures for which to wait - * @return a future to cancel or await an outcome. If this future is canceled, then - * all of the futures will be canceled - */ - protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params, - @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) { - - final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - - attachFutures(controller, futures); - - OperationOutcome[] outcomes = new OperationOutcome[futures.length]; - - @SuppressWarnings("rawtypes") - CompletableFuture[] futures2 = new CompletableFuture[futures.length]; - - // record the outcomes of each future when it completes - for (int count = 0; count < futures2.length; ++count) { - final int count2 = count; - futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2); - } - - CompletableFuture.allOf(futures2).whenComplete(combineOutcomes(params, controller, outcomes)); - - return controller; - } - - /** - * Attaches the given futures to the controller. - * - * @param controller master controller for all of the futures - * @param futures futures to be attached to the controller - */ - private void attachFutures(PipelineControllerFuture<OperationOutcome> controller, - @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) { - - // attach each task - for (CompletableFuture<OperationOutcome> future : futures) { - controller.add(future); - } - } - - /** - * Combines the outcomes from a set of tasks. - * - * @param params operation parameters - * @param future future to be completed with the combined result - * @param outcomes outcomes to be examined - */ - private BiConsumer<Void, Throwable> combineOutcomes(ControlLoopOperationParams params, - CompletableFuture<OperationOutcome> future, OperationOutcome[] outcomes) { - - return (unused, thrown) -> { - if (thrown != null) { - future.completeExceptionally(thrown); - return; - } - - // identify the outcome with the highest priority - OperationOutcome outcome = outcomes[0]; - int priority = detmPriority(outcome); - - // start with "1", as we've already dealt with "0" - for (int count = 1; count < outcomes.length; ++count) { - OperationOutcome outcome2 = outcomes[count]; - int priority2 = detmPriority(outcome2); - - if (priority2 > priority) { - outcome = outcome2; - priority = priority2; - } - } - - logger.trace("{}: combined outcome of tasks is {} for {}", getFullName(), - (outcome == null ? null : outcome.getResult()), params.getRequestId()); - - future.complete(outcome); - }; - } - - /** - * Determines the priority of an outcome based on its result. - * - * @param outcome outcome to examine, or {@code null} - * @return the outcome's priority - */ - protected int detmPriority(OperationOutcome outcome) { - if (outcome == null) { - return 1; - } - - switch (outcome.getResult()) { - case SUCCESS: - return 0; - - case FAILURE_GUARD: - return 2; - - case FAILURE_RETRIES: - return 3; - - case FAILURE: - return 4; - - case FAILURE_TIMEOUT: - return 5; - - case FAILURE_EXCEPTION: - default: - return 6; - } - } - - /** - * Performs a task, after verifying that the controller is still running. Also checks - * that the previous outcome was successful, if specified. - * - * @param params operation parameters - * @param controller overall pipeline controller - * @param checkSuccess {@code true} to check the previous outcome, {@code false} - * otherwise - * @param outcome outcome of the previous task - * @param tasks tasks to be performed - * @return a function to perform the task. If everything checks out, then it returns - * the task's future. Otherwise, it returns an incomplete future and completes - * the controller instead. - */ - // @formatter:off - protected CompletableFuture<OperationOutcome> doTask(ControlLoopOperationParams params, - PipelineControllerFuture<OperationOutcome> controller, - boolean checkSuccess, OperationOutcome outcome, - CompletableFuture<OperationOutcome> task) { - // @formatter:on - - if (checkSuccess && !isSuccess(outcome)) { - /* - * must complete before canceling so that cancel() doesn't cause controller to - * complete - */ - controller.complete(outcome); - task.cancel(false); - return new CompletableFuture<>(); - } - - return controller.wrap(task); - } - - /** - * Performs a task, after verifying that the controller is still running. Also checks - * that the previous outcome was successful, if specified. - * - * @param params operation parameters - * @param controller overall pipeline controller - * @param checkSuccess {@code true} to check the previous outcome, {@code false} - * otherwise - * @param tasks tasks to be performed - * @return a function to perform the task. If everything checks out, then it returns - * the task's future. Otherwise, it returns an incomplete future and completes - * the controller instead. - */ - // @formatter:off - protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask(ControlLoopOperationParams params, - PipelineControllerFuture<OperationOutcome> controller, - boolean checkSuccess, - Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) { - // @formatter:on - - return outcome -> { - - if (!controller.isRunning()) { - return new CompletableFuture<>(); - } - - if (checkSuccess && !isSuccess(outcome)) { - controller.complete(outcome); - return new CompletableFuture<>(); - } - - return controller.wrap(task.apply(outcome)); - }; - } - - /** - * Sets the start time of the operation and invokes the callback to indicate that the - * operation has started. Does nothing if the pipeline has been stopped. - * <p/> - * This assumes that the "outcome" is not {@code null}. - * - * @param params operation parameters - * @param callbacks used to determine if the start callback can be invoked - * @return a function that sets the start time and invokes the callback - */ - private BiConsumer<OperationOutcome, Throwable> callbackStarted(ControlLoopOperationParams params, - CallbackManager callbacks) { - - return (outcome, thrown) -> { - - if (callbacks.canStart()) { - // haven't invoked "start" callback yet - outcome.setStart(callbacks.getStartTime()); - outcome.setEnd(null); - params.callbackStarted(outcome); - } - }; - } - - /** - * Sets the end time of the operation and invokes the callback to indicate that the - * operation has completed. Does nothing if the pipeline has been stopped. - * <p/> - * This assumes that the "outcome" is not {@code null}. - * <p/> - * Note: the start time must be a reference rather than a plain value, because it's - * value must be gotten on-demand, when the returned function is executed at a later - * time. - * - * @param params operation parameters - * @param callbacks used to determine if the end callback can be invoked - * @return a function that sets the end time and invokes the callback - */ - private BiConsumer<OperationOutcome, Throwable> callbackCompleted(ControlLoopOperationParams params, - CallbackManager callbacks) { - - return (outcome, thrown) -> { - - if (callbacks.canEnd()) { - outcome.setStart(callbacks.getStartTime()); - outcome.setEnd(callbacks.getEndTime()); - params.callbackCompleted(outcome); - } - }; - } - - /** - * Sets an operation's outcome and message, based on a throwable. - * - * @param params operation parameters - * @param operation operation to be updated - * @return the updated operation - */ - protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation, - Throwable thrown) { - PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION); - return setOutcome(params, operation, result); - } - - /** - * Sets an operation's outcome and default message based on the result. - * - * @param params operation parameters - * @param operation operation to be updated - * @param result result of the operation - * @return the updated operation - */ - protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation, - PolicyResult result) { - logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId()); - operation.setResult(result); - operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG - : ControlLoopOperation.FAILED_MSG); - - return operation; - } - - /** - * Determines if a throwable is due to a timeout. - * - * @param thrown throwable of interest - * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise - */ - protected boolean isTimeout(Throwable thrown) { - if (thrown instanceof CompletionException) { - thrown = thrown.getCause(); - } - - return (thrown instanceof TimeoutException); - } - - // these may be overridden by junit tests - - /** - * Gets the operation timeout. Subclasses may override this method to obtain the - * timeout in some other way (e.g., through configuration properties). - * - * @param timeoutSec timeout, in seconds, or {@code null} - * @return the operation timeout, in milliseconds - */ - protected long getTimeOutMillis(Integer timeoutSec) { - return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS)); - } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java index 57fce40d7..925916097 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/ControlLoopOperationParams.java @@ -148,7 +148,8 @@ public class ControlLoopOperationParams { return actorService .getActor(getActor()) .getOperator(getOperation()) - .startOperation(this); + .buildOperation(this) + .start(); // @formatter:on } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java index da4fb4f0c..275c8bc4e 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java @@ -51,7 +51,7 @@ public class HttpActorParams { * indicates that it should wait forever. The default is zero. */ @Min(0) - private long timeoutSec = 0; + private int timeoutSec = 0; /** * Maps the operation name to its URI path. diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java index 695ffe4dd..93711c032 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java @@ -53,7 +53,7 @@ public class HttpParams { */ @Min(0) @Builder.Default - private long timeoutSec = 0; + private int timeoutSec = 0; /** diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java index 620950a3c..53bee5f00 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/spi/Actor.java @@ -22,7 +22,6 @@ package org.onap.policy.controlloop.actorserviceprovider.spi; import java.util.Collection; - import java.util.List; import java.util.Map; import java.util.Set; |