From 28ca4d196bb0e8c50ad64b5bfde117a93ace3e04 Mon Sep 17 00:00:00 2001 From: Jim Hahn Date: Fri, 14 Feb 2020 14:22:48 -0500 Subject: Use BidirectionalTopicClient from policy-common Also modified "target" to sink in various places, and renamed various uses of "pair" to "bidirectional" (e.g., TopicPairParams => BidirectionalTopicParams). Also replaced MyExec with PseudoExecutor, from policy-common. As part of this, extracted the logRequest and logResponse methods from the Http and Topic classes, moving them into the common OperationPartial class. Modified A&AI, SDNC junit tests to use PseudoExecutor. Added support for incomplete responses on Topics, where multiple responses may be received for one request Fixed a duplicate entry in actor.aai pom. As the changes were already big enough, went ahead and also did the following to support the APPC Actor: - Reorganized parameter classes and content. - Modified anyOf, allOf to take functions instead of futures and handle exceptions thrown by any of the functions. Also added sequence() method. - Deleted doTask. - Modified ActorService.config to take a map of maps, not just a map. - Decided NOT to move anyOf, allOf, and sequence from OperationPartial to a utility class, because they depend on "params". Issue-ID: POLICY-2363 Signed-off-by: Jim Hahn Change-Id: I5a8bae05dfef22fe71c57c58f265b9dac20df5c5 --- .../actorserviceprovider/ActorService.java | 6 +- .../impl/BidirectionalTopicActor.java | 108 ++++++ .../impl/BidirectionalTopicOperation.java | 253 ++++++++++++++ .../impl/BidirectionalTopicOperator.java | 160 +++++++++ .../actorserviceprovider/impl/HttpOperation.java | 68 +--- .../impl/OperationPartial.java | 386 ++++++++++++++------- .../actorserviceprovider/impl/TopicPairActor.java | 112 ------ .../impl/TopicPairOperation.java | 316 ----------------- .../impl/TopicPairOperator.java | 156 --------- .../parameters/BidirectionalTopicActorParams.java | 57 +++ .../parameters/BidirectionalTopicParams.java | 70 ++++ .../parameters/CommonActorParams.java | 102 ++++++ .../parameters/HttpActorParams.java | 94 +---- .../parameters/HttpParams.java | 11 +- .../parameters/TopicPairActorParams.java | 93 ----- .../parameters/TopicPairParams.java | 68 ---- .../topic/BidirectionalTopicHandler.java | 79 +++++ .../topic/BidirectionalTopicManager.java | 37 ++ .../actorserviceprovider/topic/Forwarder.java | 21 +- .../topic/TopicListenerImpl.java | 2 +- .../actorserviceprovider/topic/TopicPair.java | 122 ------- .../topic/TopicPairManager.java | 37 -- 22 files changed, 1170 insertions(+), 1188 deletions(-) create mode 100644 models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActor.java create mode 100644 models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperation.java create mode 100644 models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperator.java delete mode 100644 models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java delete mode 100644 models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java delete mode 100644 models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java create mode 100644 models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParams.java create mode 100644 models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicParams.java create mode 100644 models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/CommonActorParams.java delete mode 100644 models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java delete mode 100644 models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParams.java create mode 100644 models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicHandler.java create mode 100644 models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicManager.java delete mode 100644 models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java delete mode 100644 models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java (limited to 'models-interactions/model-actors/actorServiceProvider/src/main') diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java index 2886b1feb..24c2cfc23 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java @@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory; * {@link #start()} to start all of the actors. When finished using the actor service, * invoke {@link #stop()} or {@link #shutdown()}. */ -public class ActorService extends StartConfigPartial> { +public class ActorService extends StartConfigPartial>> { private static final Logger logger = LoggerFactory.getLogger(ActorService.class); private final Map name2actor; @@ -116,14 +116,14 @@ public class ActorService extends StartConfigPartial> { } @Override - protected void doConfigure(Map parameters) { + protected void doConfigure(Map> parameters) { logger.info("configuring actors"); BeanValidationResult valres = new BeanValidationResult("ActorService", parameters); for (Actor actor : name2actor.values()) { String actorName = actor.getName(); - Map subparams = Util.translateToMap(actorName, parameters.get(actorName)); + Map subparams = parameters.get(actorName); if (subparams != null) { diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActor.java new file mode 100644 index 000000000..1e44a170c --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActor.java @@ -0,0 +1,108 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import org.apache.commons.lang3.tuple.Pair; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicActorParams; +import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler; +import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicManager; + +/** + * Actor that uses a bidirectional topic. The actor's parameters must be a + * {@link BidirectionalTopicActorParams}. + */ +public class BidirectionalTopicActor extends ActorImpl implements BidirectionalTopicManager { + + /** + * Maps a pair of sink and source topic names to their bidirectional topic. + */ + private final Map, BidirectionalTopicHandler> params2topic = new ConcurrentHashMap<>(); + + + /** + * Constructs the object. + * + * @param name actor's name + */ + public BidirectionalTopicActor(String name) { + super(name); + } + + @Override + protected void doStart() { + params2topic.values().forEach(BidirectionalTopicHandler::start); + super.doStart(); + } + + @Override + protected void doStop() { + params2topic.values().forEach(BidirectionalTopicHandler::stop); + super.doStop(); + } + + @Override + protected void doShutdown() { + params2topic.values().forEach(BidirectionalTopicHandler::shutdown); + params2topic.clear(); + super.doShutdown(); + } + + @Override + public BidirectionalTopicHandler getTopicHandler(String sinkTopic, String sourceTopic) { + Pair key = Pair.of(sinkTopic, sourceTopic); + + return params2topic.computeIfAbsent(key, pair -> { + try { + return makeTopicHandler(sinkTopic, sourceTopic); + } catch (BidirectionalTopicClientException e) { + throw new IllegalArgumentException(e); + } + }); + } + + /** + * Translates the parameters to a {@link BidirectionalTopicActorParams} and then + * creates a function that will extract operator-specific parameters. + */ + @Override + protected Function> makeOperatorParameters(Map actorParameters) { + String actorName = getName(); + + // @formatter:off + return Util.translate(actorName, actorParameters, BidirectionalTopicActorParams.class) + .doValidation(actorName) + .makeOperationParameters(actorName); + // @formatter:on + } + + // may be overridden by junit tests + + protected BidirectionalTopicHandler makeTopicHandler(String sinkTopic, String sourceTopic) + throws BidirectionalTopicClientException { + + return new BidirectionalTopicHandler(sinkTopic, sourceTopic); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperation.java new file mode 100644 index 000000000..f82015d6b --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperation.java @@ -0,0 +1,253 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import lombok.Getter; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; +import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler; +import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder; +import org.onap.policy.controlloop.policy.PolicyResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Operation that uses a bidirectional topic. + * + * @param response type + */ +@Getter +public abstract class BidirectionalTopicOperation extends OperationPartial { + private static final Logger logger = LoggerFactory.getLogger(BidirectionalTopicOperation.class); + + /** + * Response status. + */ + public enum Status { + SUCCESS, FAILURE, STILL_WAITING + } + + // fields extracted from the operator + + private final BidirectionalTopicHandler topicHandler; + private final Forwarder forwarder; + private final BidirectionalTopicParams topicParams; + private final long timeoutMs; + + /** + * Response class. + */ + private final Class responseClass; + + + /** + * Constructs the object. + * + * @param params operation parameters + * @param operator operator that created this operation + * @param clazz response class + */ + public BidirectionalTopicOperation(ControlLoopOperationParams params, BidirectionalTopicOperator operator, + Class clazz) { + super(params, operator); + this.topicHandler = operator.getTopicHandler(); + this.forwarder = operator.getForwarder(); + this.topicParams = operator.getParams(); + this.responseClass = clazz; + this.timeoutMs = TimeUnit.MILLISECONDS.convert(topicParams.getTimeoutSec(), TimeUnit.SECONDS); + } + + /** + * If no timeout is specified, then it returns the default timeout. + */ + @Override + protected long getTimeoutMs(Integer timeoutSec) { + // TODO move this method to the superclass + return (timeoutSec == null || timeoutSec == 0 ? this.timeoutMs : super.getTimeoutMs(timeoutSec)); + } + + /** + * Publishes the request and arranges to receive the response. + */ + @Override + protected CompletableFuture startOperationAsync(int attempt, OperationOutcome outcome) { + + final Q request = makeRequest(attempt); + final List expectedKeyValues = getExpectedKeyValues(attempt, request); + + final PipelineControllerFuture controller = new PipelineControllerFuture<>(); + final Executor executor = params.getExecutor(); + + // register a listener BEFORE publishing + + BiConsumer listener = (rawResponse, scoResponse) -> { + OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse); + if (latestOutcome != null) { + // final response - complete the controller + controller.completeAsync(() -> latestOutcome, executor); + } + }; + + forwarder.register(expectedKeyValues, listener); + + // ensure listener is unregistered if the controller is canceled + controller.add(() -> forwarder.unregister(expectedKeyValues, listener)); + + // publish the request + try { + publishRequest(request); + } catch (RuntimeException e) { + logger.warn("{}: failed to publish request for {}", getFullName(), params.getRequestId()); + forwarder.unregister(expectedKeyValues, listener); + throw e; + } + + return controller; + } + + /** + * Makes the request. + * + * @param attempt operation attempt + * @return a new request + */ + protected abstract Q makeRequest(int attempt); + + /** + * Gets values, expected in the response, that should match the selector keys. + * + * @param attempt operation attempt + * @param request request to be published + * @return a list of the values to be matched by the selector keys + */ + protected abstract List getExpectedKeyValues(int attempt, Q request); + + /** + * Publishes the request. Encodes the request, if it is not already a String. + * + * @param request request to be published + */ + protected void publishRequest(Q request) { + String json; + try { + if (request instanceof String) { + json = request.toString(); + } else { + json = makeCoder().encode(request); + } + } catch (CoderException e) { + throw new IllegalArgumentException("cannot encode request", e); + } + + if (!topicHandler.send(json)) { + throw new IllegalStateException("nothing published"); + } + + logMessage(EventType.OUT, topicHandler.getSinkTopicCommInfrastructure(), topicHandler.getSinkTopic(), request); + } + + /** + * Processes a response. + * + * @param infra communication infrastructure on which the response was received + * @param outcome outcome to be populated + * @param response raw response to process + * @param scoResponse response, as a {@link StandardCoderObject} + * @return the outcome, or {@code null} if still waiting for completion + */ + protected OperationOutcome processResponse(OperationOutcome outcome, String rawResponse, + StandardCoderObject scoResponse) { + + logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId()); + + logMessage(EventType.IN, topicHandler.getSourceTopicCommInfrastructure(), topicHandler.getSourceTopic(), + rawResponse); + + // decode the response + S response; + if (responseClass == String.class) { + response = responseClass.cast(rawResponse); + + } else if (responseClass == StandardCoderObject.class) { + response = responseClass.cast(scoResponse); + + } else { + try { + response = makeCoder().decode(rawResponse, responseClass); + } catch (CoderException e) { + logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + throw new IllegalArgumentException("cannot decode response", e); + } + } + + // check its status + switch (detmStatus(rawResponse, response)) { + case SUCCESS: + logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + setOutcome(outcome, PolicyResult.SUCCESS); + postProcessResponse(outcome, rawResponse, response); + return outcome; + + case FAILURE: + logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + return setOutcome(outcome, PolicyResult.FAILURE); + + case STILL_WAITING: + default: + logger.info("{}.{} request incomplete for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + return null; + } + } + + /** + * Processes a successful response. + * + * @param outcome outcome to be populated + * @param rawResponse raw response + * @param response decoded response + */ + protected void postProcessResponse(OperationOutcome outcome, String rawResponse, S response) { + // do nothing + } + + /** + * Determines the status of the response. + * + * @param rawResponse raw response + * @param response decoded response + * @return the status of the response + */ + protected abstract Status detmStatus(String rawResponse, S response); +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperator.java new file mode 100644 index 000000000..51689e49b --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperator.java @@ -0,0 +1,160 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; +import lombok.Getter; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.controlloop.actorserviceprovider.Operation; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; +import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler; +import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicManager; +import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder; +import org.onap.policy.controlloop.actorserviceprovider.topic.SelectorKey; + +/** + * Operator that uses a bidirectional topic. Topic operators may share a + * {@link BidirectionalTopicHandler}. + */ +public abstract class BidirectionalTopicOperator extends OperatorPartial { + + /** + * Manager from which to get the topic handlers. + */ + private final BidirectionalTopicManager topicManager; + + /** + * Keys used to extract the fields used to select responses for this operator. + */ + private final List selectorKeys; + + /* + * The remaining fields are initialized when configure() is invoked, thus they may + * change. + */ + + /** + * Current parameters. While {@link params} may change, the values contained within it + * will not, thus operations may copy it. + */ + @Getter + private BidirectionalTopicParams params; + + /** + * Topic handler associated with the parameters. + */ + @Getter + private BidirectionalTopicHandler topicHandler; + + /** + * Forwarder associated with the parameters. + */ + @Getter + private Forwarder forwarder; + + + /** + * Constructs the object. + * + * @param actorName name of the actor with which this operator is associated + * @param name operation name + * @param topicManager manager from which to get the topic handler + * @param selectorKeys keys used to extract the fields used to select responses for + * this operator + */ + public BidirectionalTopicOperator(String actorName, String name, BidirectionalTopicManager topicManager, + List selectorKeys) { + super(actorName, name); + this.topicManager = topicManager; + this.selectorKeys = selectorKeys; + } + + @Override + protected void doConfigure(Map parameters) { + params = Util.translate(getFullName(), parameters, BidirectionalTopicParams.class); + ValidationResult result = params.validate(getFullName()); + if (!result.isValid()) { + throw new ParameterValidationRuntimeException("invalid parameters", result); + } + + topicHandler = topicManager.getTopicHandler(params.getSinkTopic(), params.getSourceTopic()); + forwarder = topicHandler.addForwarder(selectorKeys); + } + + /** + * Makes an operator that will construct operations. + * + * @param request type + * @param response type + * @param actorName actor name + * @param operation operation name + * @param topicManager manager from which to get the topic handler + * @param operationMaker function to make an operation + * @param keys keys used to extract the fields used to select responses for this + * operator + * @return a new operator + */ + // @formatter:off + public static BidirectionalTopicOperator makeOperator(String actorName, String operation, + BidirectionalTopicManager topicManager, + BiFunction> operationMaker, + SelectorKey... keys) { + // @formatter:off + + return makeOperator(actorName, operation, topicManager, Arrays.asList(keys), operationMaker); + } + + /** + * Makes an operator that will construct operations. + * + * @param request type + * @param response type + * @param actorName actor name + * @param operation operation name + * @param topicManager manager from which to get the topic handler + * @param keys keys used to extract the fields used to select responses for + * this operator + * @param operationMaker function to make an operation + * @return a new operator + */ + // @formatter:off + public static BidirectionalTopicOperator makeOperator(String actorName, String operation, + BidirectionalTopicManager topicManager, + List keys, + BiFunction> operationMaker) { + // @formatter:on + + return new BidirectionalTopicOperator(actorName, operation, topicManager, keys) { + @Override + public synchronized Operation buildOperation(ControlLoopOperationParams params) { + return operationMaker.apply(params, this); + } + }; + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java index ba75f0be6..f1829d79a 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java @@ -33,9 +33,7 @@ import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.http.client.HttpClient; import org.onap.policy.common.endpoints.utils.NetLoggerUtil; import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; -import org.onap.policy.common.utils.coder.Coder; import org.onap.policy.common.utils.coder.CoderException; -import org.onap.policy.common.utils.coder.StandardCoder; import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams; @@ -52,7 +50,6 @@ import org.slf4j.LoggerFactory; @Getter public abstract class HttpOperation extends OperationPartial { private static final Logger logger = LoggerFactory.getLogger(HttpOperation.class); - private static final Coder coder = new StandardCoder(); /** * Operator that created this operation. @@ -171,7 +168,7 @@ public abstract class HttpOperation extends OperationPartial { String strResponse = HttpClient.getBody(rawResponse, String.class); - logRestResponse(url, strResponse); + logMessage(EventType.IN, CommInfrastructure.REST, url, strResponse); T response; if (responseClass == String.class) { @@ -224,63 +221,10 @@ public abstract class HttpOperation extends OperationPartial { return (rawResponse.getStatus() == 200); } - /** - * Logs a REST request. If the request is not of type, String, then it attempts to - * pretty-print it into JSON before logging. - * - * @param url request URL - * @param request request to be logged - */ - public void logRestRequest(String url, Q request) { - String json; - try { - if (request == null) { - json = null; - } else if (request instanceof String) { - json = request.toString(); - } else { - json = makeCoder().encode(request, true); - } - - } catch (CoderException e) { - logger.warn("cannot pretty-print request", e); - json = request.toString(); - } - - NetLoggerUtil.log(EventType.OUT, CommInfrastructure.REST, url, json); - logger.info("[OUT|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json); - } - - /** - * Logs a REST response. If the response is not of type, String, then it attempts to - * pretty-print it into JSON before logging. - * - * @param url request URL - * @param response response to be logged - */ - public void logRestResponse(String url, S response) { - String json; - try { - if (response == null) { - json = null; - } else if (response instanceof String) { - json = response.toString(); - } else { - json = makeCoder().encode(response, true); - } - - } catch (CoderException e) { - logger.warn("cannot pretty-print response", e); - json = response.toString(); - } - - NetLoggerUtil.log(EventType.IN, CommInfrastructure.REST, url, json); - logger.info("[IN|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json); - } - - // these may be overridden by junit tests - - protected Coder makeCoder() { - return coder; + @Override + public String logMessage(EventType direction, CommInfrastructure infra, String sink, Q request) { + String json = super.logMessage(direction, infra, sink, request); + NetLoggerUtil.log(direction, infra, sink, json); + return json; } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java index d00b88bb5..0b3497197 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java @@ -20,7 +20,12 @@ package org.onap.policy.controlloop.actorserviceprovider.impl; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; import java.util.List; +import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; @@ -28,6 +33,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; import org.onap.policy.controlloop.ControlLoopOperation; import org.onap.policy.controlloop.actorserviceprovider.CallbackManager; import org.onap.policy.controlloop.actorserviceprovider.Operation; @@ -53,8 +66,8 @@ import org.slf4j.LoggerFactory; * be done to cancel that particular operation. */ public abstract class OperationPartial implements Operation { - private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class); + private static final Coder coder = new StandardCoder(); public static final long DEFAULT_RETRY_WAIT_MS = 1000L; // values extracted from the operator @@ -470,103 +483,110 @@ public abstract class OperationPartial implements Operation { * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels * any outstanding futures when one completes. * - * @param futures futures for which to wait - * @return a future to cancel or await an outcome. If this future is canceled, then - * all of the futures will be canceled + * @param futureMakers function to make a future. If the function returns + * {@code null}, then no future is created for that function. On the other + * hand, if the function throws an exception, then the previously created + * functions are canceled and the exception is re-thrown + * @return a future to cancel or await an outcome, or {@code null} if no futures were + * created. If this future is canceled, then all of the futures will be + * canceled */ - protected CompletableFuture anyOf(List> futures) { - - // convert list to an array - @SuppressWarnings("rawtypes") - CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]); + protected CompletableFuture anyOf( + @SuppressWarnings("unchecked") Supplier>... futureMakers) { - @SuppressWarnings("unchecked") - CompletableFuture result = anyOf(arrFutures); - return result; + return anyOf(Arrays.asList(futureMakers)); } /** - * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any - * outstanding futures when one completes. + * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels + * any outstanding futures when one completes. * - * @param futures futures for which to wait - * @return a future to cancel or await an outcome. If this future is canceled, then - * all of the futures will be canceled + * @param futureMakers function to make a future. If the function returns + * {@code null}, then no future is created for that function. On the other + * hand, if the function throws an exception, then the previously created + * functions are canceled and the exception is re-thrown + * @return a future to cancel or await an outcome, or {@code null} if no futures were + * created. If this future is canceled, then all of the futures will be + * canceled. Similarly, when this future completes, any incomplete futures + * will be canceled */ protected CompletableFuture anyOf( - @SuppressWarnings("unchecked") CompletableFuture... futures) { + List>> futureMakers) { + + PipelineControllerFuture controller = new PipelineControllerFuture<>(); + + CompletableFuture[] futures = + attachFutures(controller, futureMakers, UnaryOperator.identity()); + + if (futures.length == 0) { + // no futures were started + return null; + } if (futures.length == 1) { return futures[0]; } - final Executor executor = params.getExecutor(); - final PipelineControllerFuture controller = new PipelineControllerFuture<>(); - - attachFutures(controller, futures); - - // @formatter:off - CompletableFuture.anyOf(futures) - .thenApply(object -> (OperationOutcome) object) - .whenCompleteAsync(controller.delayedComplete(), executor); - // @formatter:on + CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome) + .whenCompleteAsync(controller.delayedComplete(), params.getExecutor()); return controller; } /** - * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels - * the futures if returned future is canceled. The future returns the "worst" outcome, - * based on priority (see {@link #detmPriority(OperationOutcome)}). + * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}. * - * @param futures futures for which to wait - * @return a future to cancel or await an outcome. If this future is canceled, then - * all of the futures will be canceled + * @param futureMakers function to make a future. If the function returns + * {@code null}, then no future is created for that function. On the other + * hand, if the function throws an exception, then the previously created + * functions are canceled and the exception is re-thrown + * @return a future to cancel or await an outcome, or {@code null} if no futures were + * created. If this future is canceled, then all of the futures will be + * canceled */ - protected CompletableFuture allOf(List> futures) { - - // convert list to an array - @SuppressWarnings("rawtypes") - CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]); + protected CompletableFuture allOf( + @SuppressWarnings("unchecked") Supplier>... futureMakers) { - @SuppressWarnings("unchecked") - CompletableFuture result = allOf(arrFutures); - return result; + return allOf(Arrays.asList(futureMakers)); } /** - * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the - * futures if returned future is canceled. The future returns the "worst" outcome, - * based on priority (see {@link #detmPriority(OperationOutcome)}). + * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}. * - * @param futures futures for which to wait - * @return a future to cancel or await an outcome. If this future is canceled, then - * all of the futures will be canceled + * @param futureMakers function to make a future. If the function returns + * {@code null}, then no future is created for that function. On the other + * hand, if the function throws an exception, then the previously created + * functions are canceled and the exception is re-thrown + * @return a future to cancel or await an outcome, or {@code null} if no futures were + * created. If this future is canceled, then all of the futures will be + * canceled. Similarly, when this future completes, any incomplete futures + * will be canceled */ protected CompletableFuture allOf( - @SuppressWarnings("unchecked") CompletableFuture... futures) { - - if (futures.length == 1) { - return futures[0]; - } + List>> futureMakers) { + PipelineControllerFuture controller = new PipelineControllerFuture<>(); - final PipelineControllerFuture controller = new PipelineControllerFuture<>(); - - attachFutures(controller, futures); + Queue outcomes = new LinkedList<>(); - OperationOutcome[] outcomes = new OperationOutcome[futures.length]; + CompletableFuture[] futures = + attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> { + synchronized (outcomes) { + outcomes.add(outcome); + } + return outcome; + })); - @SuppressWarnings("rawtypes") - CompletableFuture[] futures2 = new CompletableFuture[futures.length]; + if (futures.length == 0) { + // no futures were started + return null; + } - // record the outcomes of each future when it completes - for (int count = 0; count < futures2.length; ++count) { - final int count2 = count; - futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2); + if (futures.length == 1) { + return futures[0]; } // @formatter:off - CompletableFuture.allOf(futures2) + CompletableFuture.allOf(futures) .thenApply(unused -> combineOutcomes(outcomes)) .whenCompleteAsync(controller.delayedComplete(), params.getExecutor()); // @formatter:on @@ -575,22 +595,62 @@ public abstract class OperationPartial implements Operation { } /** - * Attaches the given futures to the controller. + * Invokes the functions to create the futures and attaches them to the controller. * * @param controller master controller for all of the futures - * @param futures futures to be attached to the controller - */ - private void attachFutures(PipelineControllerFuture controller, - @SuppressWarnings("unchecked") CompletableFuture... futures) { + * @param futureMakers futures to be attached to the controller + * @param adorn function that "adorns" the future, possible adding onto its pipeline. + * Returns the adorned future + * @return an array of futures, possibly zero-length. If the array is of size one, + * then that one item should be returned instead of the controller + */ + private CompletableFuture[] attachFutures(PipelineControllerFuture controller, + List>> futureMakers, + UnaryOperator> adorn) { + + if (futureMakers.isEmpty()) { + @SuppressWarnings("unchecked") + CompletableFuture[] result = new CompletableFuture[0]; + return result; + } - if (futures.length == 0) { - throw new IllegalArgumentException("empty list of futures"); + // the last, unadorned future that is created + CompletableFuture lastFuture = null; + + List> futures = new ArrayList<>(futureMakers.size()); + + // make each future + for (var maker : futureMakers) { + try { + CompletableFuture future = maker.get(); + if (future == null) { + continue; + } + + // propagate "stop" to the future + controller.add(future); + + futures.add(adorn.apply(future)); + + lastFuture = future; + + } catch (RuntimeException e) { + logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId()); + controller.cancel(false); + throw e; + } } - // attach each task - for (CompletableFuture future : futures) { - controller.add(future); + @SuppressWarnings("unchecked") + CompletableFuture[] result = new CompletableFuture[futures.size()]; + + if (result.length == 1) { + // special case - return the unadorned future + result[0] = lastFuture; + return result; } + + return futures.toArray(result); } /** @@ -599,15 +659,13 @@ public abstract class OperationPartial implements Operation { * @param outcomes outcomes to be examined * @return the combined outcome */ - private OperationOutcome combineOutcomes(OperationOutcome[] outcomes) { + private OperationOutcome combineOutcomes(Queue outcomes) { // identify the outcome with the highest priority - OperationOutcome outcome = outcomes[0]; + OperationOutcome outcome = outcomes.remove(); int priority = detmPriority(outcome); - // start with "1", as we've already dealt with "0" - for (int count = 1; count < outcomes.length; ++count) { - OperationOutcome outcome2 = outcomes[count]; + for (OperationOutcome outcome2 : outcomes) { int priority2 = detmPriority(outcome2); if (priority2 > priority) { @@ -656,71 +714,113 @@ public abstract class OperationPartial implements Operation { } /** - * Performs a task, after verifying that the controller is still running. Also checks - * that the previous outcome was successful, if specified. + * Performs a sequence of tasks, stopping if a task fails. A given task's future is + * not created until the previous task completes. The pipeline returns the outcome of + * the last task executed. * - * @param controller overall pipeline controller - * @param checkSuccess {@code true} to check the previous outcome, {@code false} - * otherwise - * @param outcome outcome of the previous task - * @param task task to be performed - * @return the task, if everything checks out. Otherwise, it returns an incomplete - * future and completes the controller instead + * @param futureMakers functions to make the futures + * @return a future to cancel the sequence or await the outcome */ - // @formatter:off - protected CompletableFuture doTask( - PipelineControllerFuture controller, - boolean checkSuccess, OperationOutcome outcome, - CompletableFuture task) { - // @formatter:on + protected CompletableFuture sequence( + @SuppressWarnings("unchecked") Supplier>... futureMakers) { - if (checkSuccess && !isSuccess(outcome)) { - /* - * must complete before canceling so that cancel() doesn't cause controller to - * complete - */ - controller.complete(outcome); - task.cancel(false); - return new CompletableFuture<>(); + return sequence(Arrays.asList(futureMakers)); + } + + /** + * Performs a sequence of tasks, stopping if a task fails. A given task's future is + * not created until the previous task completes. The pipeline returns the outcome of + * the last task executed. + * + * @param futureMakers functions to make the futures + * @return a future to cancel the sequence or await the outcome, or {@code null} if + * there were no tasks to perform + */ + protected CompletableFuture sequence( + List>> futureMakers) { + + Queue>> queue = new ArrayDeque<>(futureMakers); + + CompletableFuture nextTask = getNextTask(queue); + if (nextTask == null) { + // no tasks + return null; + } + + if (queue.isEmpty()) { + // only one task - just return it rather than wrapping it in a controller + return nextTask; } - return controller.wrap(task); + /* + * multiple tasks - need a controller to stop whichever task is currently + * executing + */ + final PipelineControllerFuture controller = new PipelineControllerFuture<>(); + final Executor executor = params.getExecutor(); + + // @formatter:off + controller.wrap(nextTask) + .thenComposeAsync(nextTaskOnSuccess(controller, queue), executor) + .whenCompleteAsync(controller.delayedComplete(), executor); + // @formatter:on + + return controller; } /** - * Performs a task, after verifying that the controller is still running. Also checks - * that the previous outcome was successful, if specified. + * Executes the next task in the queue, if the previous outcome was successful. * - * @param controller overall pipeline controller - * @param checkSuccess {@code true} to check the previous outcome, {@code false} - * otherwise - * @param task function to start the task to be performed - * @return a function to perform the task. If everything checks out, then it returns - * the task. Otherwise, it returns an incomplete future and completes the - * controller instead + * @param controller pipeline controller + * @param taskQueue queue of tasks to be performed + * @return a future to execute the remaining tasks, or the current outcome, if it's a + * failure, or if there are no more tasks */ - // @formatter:off - protected Function> doTask( + private Function> nextTaskOnSuccess( PipelineControllerFuture controller, - boolean checkSuccess, - Function> task) { - // @formatter:on + Queue>> taskQueue) { return outcome -> { - - if (!controller.isRunning()) { - return new CompletableFuture<>(); + if (!isSuccess(outcome)) { + // return the failure + return CompletableFuture.completedFuture(outcome); } - if (checkSuccess && !isSuccess(outcome)) { - controller.complete(outcome); - return new CompletableFuture<>(); + CompletableFuture nextTask = getNextTask(taskQueue); + if (nextTask == null) { + // no tasks - just return the success + return CompletableFuture.completedFuture(outcome); } - return controller.wrap(task.apply(outcome)); + // @formatter:off + return controller + .wrap(nextTask) + .thenComposeAsync(nextTaskOnSuccess(controller, taskQueue), params.getExecutor()); + // @formatter:on }; } + /** + * Gets the next task from the queue, skipping those that are {@code null}. + * + * @param taskQueue task queue + * @return the next task, or {@code null} if the queue is now empty + */ + private CompletableFuture getNextTask( + Queue>> taskQueue) { + + Supplier> maker; + + while ((maker = taskQueue.poll()) != null) { + CompletableFuture future = maker.get(); + if (future != null) { + return future; + } + } + + return null; + } + /** * Sets the start time of the operation and invokes the callback to indicate that the * operation has started. Does nothing if the pipeline has been stopped. @@ -809,6 +909,38 @@ public abstract class OperationPartial implements Operation { return (thrown instanceof TimeoutException); } + /** + * Logs a response. If the response is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param direction IN or OUT + * @param infra communication infrastructure on which it was published + * @param source source name (e.g., the URL or Topic name) + * @param response response to be logged + * @return the JSON text that was logged + */ + public String logMessage(EventType direction, CommInfrastructure infra, String source, T response) { + String json; + try { + if (response == null) { + json = null; + } else if (response instanceof String) { + json = response.toString(); + } else { + json = makeCoder().encode(response, true); + } + + } catch (CoderException e) { + String type = (direction == EventType.IN ? "response" : "request"); + logger.warn("cannot pretty-print {}", type, e); + json = response.toString(); + } + + logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json); + + return json; + } + // these may be overridden by subclasses or junit tests /** @@ -841,4 +973,10 @@ public abstract class OperationPartial implements Operation { protected long getTimeoutMs(Integer timeoutSec) { return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS)); } + + // these may be overridden by junit tests + + protected Coder makeCoder() { + return coder; + } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java deleted file mode 100644 index c3e1e5c4d..000000000 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java +++ /dev/null @@ -1,112 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.controlloop.actorserviceprovider.impl; - -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Function; -import org.apache.commons.lang3.tuple.Pair; -import org.onap.policy.common.parameters.ValidationResult; -import org.onap.policy.controlloop.actorserviceprovider.Util; -import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; -import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairActorParams; -import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair; -import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager; - -/** - * Actor that uses a topic pair. The actor's parameters must be a - * {@link TopicPairActorParams}. - */ -public class TopicPairActor extends ActorImpl implements TopicPairManager { - - /** - * Maps a topic source and target name to its topic pair. - */ - private final Map, TopicPair> params2topic = new ConcurrentHashMap<>(); - - - /** - * Constructs the object. - * - * @param name actor's name - */ - public TopicPairActor(String name) { - super(name); - } - - @Override - protected void doStart() { - params2topic.values().forEach(TopicPair::start); - super.doStart(); - } - - @Override - protected void doStop() { - params2topic.values().forEach(TopicPair::stop); - super.doStop(); - } - - @Override - protected void doShutdown() { - params2topic.values().forEach(TopicPair::shutdown); - params2topic.clear(); - super.doShutdown(); - } - - @Override - public TopicPair getTopicPair(String source, String target) { - Pair key = Pair.of(source, target); - return params2topic.computeIfAbsent(key, pair -> new TopicPair(source, target)); - } - - /** - * Translates the parameters to a {@link TopicPairActorParams} and then creates a - * function that will extract operator-specific parameters. - */ - @Override - protected Function> makeOperatorParameters(Map actorParameters) { - String actorName = getName(); - - TopicPairActorParams params = Util.translate(actorName, actorParameters, TopicPairActorParams.class); - ValidationResult result = params.validate(getName()); - if (!result.isValid()) { - throw new ParameterValidationRuntimeException("invalid parameters", result); - } - - // create a map of the default parameters - Map defaultParams = Util.translateToMap(getName(), params.getDefaults()); - Map> operations = params.getOperation(); - - return operationName -> { - Map specificParams = operations.get(operationName); - if (specificParams == null) { - return null; - } - - // start with a copy of defaults and overlay with specific - Map subparams = new TreeMap<>(defaultParams); - subparams.putAll(specificParams); - - return Util.translateToMap(getName() + "." + operationName, subparams); - }; - } -} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java deleted file mode 100644 index 6b584d7c6..000000000 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java +++ /dev/null @@ -1,316 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.controlloop.actorserviceprovider.impl; - -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import lombok.Getter; -import org.apache.commons.lang3.tuple.Triple; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.utils.NetLoggerUtil; -import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer; -import org.onap.policy.common.utils.coder.Coder; -import org.onap.policy.common.utils.coder.CoderException; -import org.onap.policy.common.utils.coder.StandardCoder; -import org.onap.policy.common.utils.coder.StandardCoderObject; -import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; -import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; -import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams; -import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; -import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder; -import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair; -import org.onap.policy.controlloop.policy.PolicyResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Operation that uses a Topic pair. - * - * @param response type - */ -@Getter -public abstract class TopicPairOperation extends OperationPartial { - private static final Logger logger = LoggerFactory.getLogger(TopicPairOperation.class); - private static final Coder coder = new StandardCoder(); - - // fields extracted from the operator - - private final TopicPair topicPair; - private final Forwarder forwarder; - private final TopicPairParams pairParams; - private final long timeoutMs; - - /** - * Response class. - */ - private final Class responseClass; - - - /** - * Constructs the object. - * - * @param params operation parameters - * @param operator operator that created this operation - * @param clazz response class - */ - public TopicPairOperation(ControlLoopOperationParams params, TopicPairOperator operator, Class clazz) { - super(params, operator); - this.topicPair = operator.getTopicPair(); - this.forwarder = operator.getForwarder(); - this.pairParams = operator.getParams(); - this.responseClass = clazz; - this.timeoutMs = TimeUnit.MILLISECONDS.convert(pairParams.getTimeoutSec(), TimeUnit.SECONDS); - } - - /** - * If no timeout is specified, then it returns the default timeout. - */ - @Override - protected long getTimeoutMs(Integer timeoutSec) { - // TODO move this method to the superclass - return (timeoutSec == null || timeoutSec == 0 ? this.timeoutMs : super.getTimeoutMs(timeoutSec)); - } - - /** - * Publishes the request and arranges to receive the response. - */ - @Override - protected CompletableFuture startOperationAsync(int attempt, OperationOutcome outcome) { - - final Q request = makeRequest(attempt); - final List expectedKeyValues = getExpectedKeyValues(attempt, request); - - final PipelineControllerFuture controller = new PipelineControllerFuture<>(); - final CompletableFuture> future = - new CompletableFuture<>(); - final Executor executor = params.getExecutor(); - - // register a listener BEFORE publishing - - // @formatter:off - TriConsumer listener = - (infra, rawResponse, scoResponse) -> future.complete(Triple.of(infra, rawResponse, scoResponse)); - // @formatter:on - - // TODO this currently only allows a single matching response - - forwarder.register(expectedKeyValues, listener); - - // ensure listener is unregistered if the controller is canceled - controller.add(() -> forwarder.unregister(expectedKeyValues, listener)); - - // publish the request - try { - publishRequest(request); - } catch (RuntimeException e) { - logger.warn("{}: failed to publish request for {}", getFullName(), params.getRequestId()); - forwarder.unregister(expectedKeyValues, listener); - throw e; - } - - - // once "future" completes, process the response, and then complete the controller - - // @formatter:off - future.thenApplyAsync( - triple -> processResponse(triple.getLeft(), outcome, triple.getMiddle(), triple.getRight()), - executor) - .whenCompleteAsync(controller.delayedComplete(), executor); - // @formatter:on - - return controller; - } - - /** - * Makes the request. - * - * @param attempt operation attempt - * @return a new request - */ - protected abstract Q makeRequest(int attempt); - - /** - * Gets values, expected in the response, that should match the selector keys. - * - * @param attempt operation attempt - * @param request request to be published - * @return a list of the values to be matched by the selector keys - */ - protected abstract List getExpectedKeyValues(int attempt, Q request); - - /** - * Publishes the request. Encodes the request, if it is not already a String. - * - * @param request request to be published - */ - protected void publishRequest(Q request) { - String json; - try { - if (request instanceof String) { - json = request.toString(); - } else { - json = makeCoder().encode(request); - } - } catch (CoderException e) { - throw new IllegalArgumentException("cannot encode request", e); - } - - List list = topicPair.publish(json); - if (list.isEmpty()) { - throw new IllegalStateException("nothing published"); - } - - logTopicRequest(list, request); - } - - /** - * Processes a response. - * - * @param infra communication infrastructure on which the response was received - * @param outcome outcome to be populated - * @param response raw response to process - * @param scoResponse response, as a {@link StandardCoderObject} - * @return the outcome - */ - protected OperationOutcome processResponse(CommInfrastructure infra, OperationOutcome outcome, String rawResponse, - StandardCoderObject scoResponse) { - - logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId()); - - logTopicResponse(infra, rawResponse); - - S response; - if (responseClass == String.class) { - response = responseClass.cast(rawResponse); - - } else if (responseClass == StandardCoderObject.class) { - response = responseClass.cast(scoResponse); - - } else { - try { - response = makeCoder().decode(rawResponse, responseClass); - } catch (CoderException e) { - logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(), - params.getRequestId()); - throw new IllegalArgumentException("cannot decode response", e); - } - } - - if (!isSuccess(rawResponse, response)) { - logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(), - params.getRequestId()); - return setOutcome(outcome, PolicyResult.FAILURE); - } - - logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId()); - setOutcome(outcome, PolicyResult.SUCCESS); - postProcessResponse(outcome, rawResponse, response); - - return outcome; - } - - /** - * Processes a successful response. - * - * @param outcome outcome to be populated - * @param rawResponse raw response - * @param response decoded response - */ - protected void postProcessResponse(OperationOutcome outcome, String rawResponse, S response) { - // do nothing - } - - /** - * Determines if the response indicates success. - * - * @param rawResponse raw response - * @param response decoded response - * @return {@code true} if the response indicates success, {@code false} otherwise - */ - protected abstract boolean isSuccess(String rawResponse, S response); - - /** - * Logs a TOPIC request. If the request is not of type, String, then it attempts to - * pretty-print it into JSON before logging. - * - * @param infrastructures list of communication infrastructures on which it was - * published - * @param request request to be logged - */ - protected void logTopicRequest(List infrastructures, Q request) { - if (infrastructures.isEmpty()) { - return; - } - - String json; - try { - if (request == null) { - json = null; - } else if (request instanceof String) { - json = request.toString(); - } else { - json = makeCoder().encode(request, true); - } - - } catch (CoderException e) { - logger.warn("cannot pretty-print request", e); - json = request.toString(); - } - - for (CommInfrastructure infra : infrastructures) { - logger.info("[OUT|{}|{}|]{}{}", infra, pairParams.getTarget(), NetLoggerUtil.SYSTEM_LS, json); - } - } - - /** - * Logs a TOPIC response. If the response is not of type, String, then it attempts to - * pretty-print it into JSON before logging. - * - * @param infra communication infrastructure on which the response was received - * @param response response to be logged - */ - protected void logTopicResponse(CommInfrastructure infra, T response) { - String json; - try { - if (response == null) { - json = null; - } else if (response instanceof String) { - json = response.toString(); - } else { - json = makeCoder().encode(response, true); - } - - } catch (CoderException e) { - logger.warn("cannot pretty-print response", e); - json = response.toString(); - } - - logger.info("[IN|{}|{}|]{}{}", infra, pairParams.getSource(), NetLoggerUtil.SYSTEM_LS, json); - } - - // these may be overridden by junit tests - - protected Coder makeCoder() { - return coder; - } -} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java deleted file mode 100644 index 8ce013388..000000000 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java +++ /dev/null @@ -1,156 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.controlloop.actorserviceprovider.impl; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.function.BiFunction; -import lombok.Getter; -import org.onap.policy.common.parameters.ValidationResult; -import org.onap.policy.controlloop.actorserviceprovider.Operation; -import org.onap.policy.controlloop.actorserviceprovider.Util; -import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; -import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; -import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams; -import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder; -import org.onap.policy.controlloop.actorserviceprovider.topic.SelectorKey; -import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair; -import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager; - -/** - * Operator that uses a pair of topics, one for publishing the request, and another for - * receiving the response. Topic operators may share a {@link TopicPair}. - */ -public abstract class TopicPairOperator extends OperatorPartial { - - /** - * Manager from which to get the topic pair. - */ - private final TopicPairManager pairManager; - - /** - * Keys used to extract the fields used to select responses for this operator. - */ - private final List selectorKeys; - - /* - * The remaining fields are initialized when configure() is invoked, thus they may - * change. - */ - - /** - * Current parameters. While {@link params} may change, the values contained within it - * will not, thus operations may copy it. - */ - @Getter - private TopicPairParams params; - - /** - * Topic pair associated with the parameters. - */ - @Getter - private TopicPair topicPair; - - /** - * Forwarder associated with the parameters. - */ - @Getter - private Forwarder forwarder; - - - /** - * Constructs the object. - * - * @param actorName name of the actor with which this operator is associated - * @param name operation name - * @param pairManager manager from which to get the topic pair - * @param selectorKeys keys used to extract the fields used to select responses for - * this operator - */ - public TopicPairOperator(String actorName, String name, TopicPairManager pairManager, - List selectorKeys) { - super(actorName, name); - this.pairManager = pairManager; - this.selectorKeys = selectorKeys; - } - - @Override - protected void doConfigure(Map parameters) { - params = Util.translate(getFullName(), parameters, TopicPairParams.class); - ValidationResult result = params.validate(getFullName()); - if (!result.isValid()) { - throw new ParameterValidationRuntimeException("invalid parameters", result); - } - - topicPair = pairManager.getTopicPair(params.getSource(), params.getTarget()); - forwarder = topicPair.addForwarder(selectorKeys); - } - - /** - * Makes an operator that will construct operations. - * - * @param request type - * @param response type - * @param actorName actor name - * @param operation operation name - * @param pairManager manager from which to get the topic pair - * @param operationMaker function to make an operation - * @param keys keys used to extract the fields used to select responses for this - * operator - * @return a new operator - */ - // @formatter:off - public static TopicPairOperator makeOperator(String actorName, String operation, TopicPairManager pairManager, - BiFunction> operationMaker, - SelectorKey... keys) { - // @formatter:off - - return makeOperator(actorName, operation, pairManager, Arrays.asList(keys), operationMaker); - } - - /** - * Makes an operator that will construct operations. - * - * @param request type - * @param response type - * @param actorName actor name - * @param operation operation name - * @param pairManager manager from which to get the topic pair - * @param keys keys used to extract the fields used to select responses for - * this operator - * @param operationMaker function to make an operation - * @return a new operator - */ - // @formatter:off - public static TopicPairOperator makeOperator(String actorName, String operation, TopicPairManager pairManager, - List keys, - BiFunction> operationMaker) { - // @formatter:on - - return new TopicPairOperator(actorName, operation, pairManager, keys) { - @Override - public synchronized Operation buildOperation(ControlLoopOperationParams params) { - return operationMaker.apply(params, this); - } - }; - } -} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParams.java new file mode 100644 index 000000000..291aeeb23 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParams.java @@ -0,0 +1,57 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.parameters; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.common.parameters.annotations.Min; + +/** + * Parameters used by Actors whose Operators use bidirectional topic. + */ +@Getter +@Setter +@EqualsAndHashCode(callSuper = true) +public class BidirectionalTopicActorParams extends CommonActorParams { + + /* + * Optional, default values that are used if missing from the operation-specific + * parameters. + */ + + /** + * Sink topic name to which requests should be published. + */ + private String sinkTopic; + + /** + * Source topic name, from which to read responses. + */ + private String sourceTopic; + + /** + * Amount of time, in seconds, to wait for the HTTP request to complete. The default + * is 90 seconds. + */ + @Min(1) + private int timeoutSec = 90; +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicParams.java new file mode 100644 index 000000000..cafca1fa6 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicParams.java @@ -0,0 +1,70 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.parameters; + +import lombok.Builder; +import lombok.Data; +import org.onap.policy.common.parameters.BeanValidator; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.common.parameters.annotations.Min; +import org.onap.policy.common.parameters.annotations.NotBlank; +import org.onap.policy.common.parameters.annotations.NotNull; + +/** + * Parameters used by Operators that use a bidirectional topic. + */ +@NotNull +@NotBlank +@Data +@Builder(toBuilder = true) +public class BidirectionalTopicParams { + + /** + * Sink topic name to which requests should be published. + */ + private String sinkTopic; + + /** + * Source topic name, from which to read responses. + */ + private String sourceTopic; + + /** + * Amount of time, in seconds to wait for the response. + *

+ * Note: this should NOT have a default value, as it receives its default value from + * {@link BidirectionalTopicActorParams}. + */ + @Min(1) + private int timeoutSec; + + + /** + * Validates the parameters. + * + * @param resultName name of the result + * + * @return the validation result + */ + public ValidationResult validate(String resultName) { + return new BeanValidator().validateTop(resultName, this); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/CommonActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/CommonActorParams.java new file mode 100644 index 000000000..dc6f2b657 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/CommonActorParams.java @@ -0,0 +1,102 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.parameters; + +import java.util.Map; +import java.util.TreeMap; +import java.util.function.Function; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.common.parameters.BeanValidator; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.common.parameters.annotations.NotNull; +import org.onap.policy.controlloop.actorserviceprovider.Util; + +/** + * Superclass for Actor parameters that have default values in "this" object, and + * operation-specific values in {@link #operation}. + */ +@Getter +@Setter +@EqualsAndHashCode +public class CommonActorParams { + + /** + * Maps the operation name to its parameters. + */ + @NotNull + protected Map> operation; + + + /** + * Extracts a specific operation's parameters from "this". + * + * @param name name of the item containing "this" + * @return a function to extract an operation's parameters from "this". Note: the + * returned function is not thread-safe + */ + public Function> makeOperationParameters(String name) { + + Map defaultParams = Util.translateToMap(name, this); + defaultParams.remove("operation"); + + return operationName -> { + Map specificParams = operation.get(operationName); + if (specificParams == null) { + return null; + } + + // start with a copy of defaults and overlay with specific + Map subparams = new TreeMap<>(defaultParams); + subparams.putAll(specificParams); + + return Util.translateToMap(name + "." + operationName, subparams); + }; + } + + /** + * Validates the parameters. + * + * @param name name of the object containing these parameters + * @return "this" + * @throws IllegalArgumentException if the parameters are invalid + */ + public CommonActorParams doValidation(String name) { + ValidationResult result = validate(name); + if (!result.isValid()) { + throw new ParameterValidationRuntimeException("invalid parameters", result); + } + + return this; + } + + /** + * Validates the parameters. + * + * @param resultName name of the result + * + * @return the validation result + */ + public ValidationResult validate(String resultName) { + return new BeanValidator().validateTop(resultName, this); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java index 275c8bc4e..d589e1d7e 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java @@ -20,26 +20,23 @@ package org.onap.policy.controlloop.actorserviceprovider.parameters; -import java.util.Map; -import java.util.function.Function; -import lombok.Data; -import org.onap.policy.common.parameters.BeanValidationResult; -import org.onap.policy.common.parameters.BeanValidator; -import org.onap.policy.common.parameters.ValidationResult; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; import org.onap.policy.common.parameters.annotations.Min; -import org.onap.policy.common.parameters.annotations.NotBlank; -import org.onap.policy.common.parameters.annotations.NotNull; -import org.onap.policy.controlloop.actorserviceprovider.Util; /** - * Parameters used by Actors that connect to a server via HTTP. This contains the - * parameters that are common to all of the operations. Only the path changes for each - * operation, thus it includes a mapping from operation name to path. + * Parameters used by Actors that connect to a server via HTTP. */ -@Data -@NotNull -@NotBlank -public class HttpActorParams { +@Getter +@Setter +@EqualsAndHashCode(callSuper = true) +public class HttpActorParams extends CommonActorParams { + + /* + * Optional, default values that are used if missing from the operation-specific + * parameters. + */ /** * Name of the HttpClient, as found in the HttpClientFactory. @@ -47,66 +44,9 @@ public class HttpActorParams { private String clientName; /** - * Amount of time, in seconds to wait for the HTTP request to complete, where zero - * indicates that it should wait forever. The default is zero. - */ - @Min(0) - private int timeoutSec = 0; - - /** - * Maps the operation name to its URI path. - */ - private Map path; - - /** - * Extracts a specific operation's parameters from "this". - * - * @param name name of the item containing "this" - * @return a function to extract an operation's parameters from "this". Note: the - * returned function is not thread-safe - */ - public Function> makeOperationParameters(String name) { - HttpParams subparams = HttpParams.builder().clientName(getClientName()).timeoutSec(getTimeoutSec()).build(); - - return operation -> { - String subpath = path.get(operation); - if (subpath == null) { - return null; - } - - subparams.setPath(subpath); - return Util.translateToMap(name + "." + operation, subparams); - }; - } - - /** - * Validates the parameters. - * - * @param name name of the object containing these parameters - * @return "this" - * @throws IllegalArgumentException if the parameters are invalid + * Amount of time, in seconds, to wait for the HTTP request to complete. The default + * is 90 seconds. */ - public HttpActorParams doValidation(String name) { - ValidationResult result = validate(name); - if (!result.isValid()) { - throw new ParameterValidationRuntimeException("invalid parameters", result); - } - - return this; - } - - /** - * Validates the parameters. - * - * @param resultName name of the result - * - * @return the validation result - */ - public ValidationResult validate(String resultName) { - BeanValidationResult result = new BeanValidator().validateTop(resultName, this); - - result.validateMap("path", path, (result2, entry) -> result2.validateNotNull(entry.getKey(), entry.getValue())); - - return result; - } + @Min(1) + private int timeoutSec = 90; } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java index 93711c032..2d3ab8b54 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java @@ -48,12 +48,13 @@ public class HttpParams { private String path; /** - * Amount of time, in seconds to wait for the HTTP request to complete, where zero - * indicates that it should wait forever. The default is zero. + * Amount of time, in seconds, to wait for the HTTP request to complete. + *

+ * Note: this should NOT have a default value, as it receives its default value from + * {@link HttpActorParams}. */ - @Min(0) - @Builder.Default - private int timeoutSec = 0; + @Min(1) + private int timeoutSec; /** diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java deleted file mode 100644 index 42a44ee9c..000000000 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java +++ /dev/null @@ -1,93 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.controlloop.actorserviceprovider.parameters; - -import java.util.Map; -import lombok.Builder; -import lombok.Data; -import org.onap.policy.common.parameters.BeanValidationResult; -import org.onap.policy.common.parameters.BeanValidator; -import org.onap.policy.common.parameters.ValidationResult; -import org.onap.policy.common.parameters.annotations.NotBlank; -import org.onap.policy.common.parameters.annotations.NotNull; - -/** - * Parameters used by Actors whose Operators use a pair of Topics, one to publish requests - * and the other to receive responses. - */ -@NotNull -@NotBlank -@Data -@Builder -public class TopicPairActorParams { - - /** - * This contains the default parameters that are used when an operation doesn't - * specify them. Note: each operation to be used must still have an entry in - * {@link #operation}, even if it's empty. Otherwise, the given operation will not be - * started. - */ - private TopicPairParams defaults; - - /** - * Maps an operation name to its individual parameters. - */ - private Map> operation; - - - /** - * Validates the parameters. - * - * @param name name of the object containing these parameters - * @return "this" - * @throws IllegalArgumentException if the parameters are invalid - */ - public TopicPairActorParams doValidation(String name) { - ValidationResult result = validate(name); - if (!result.isValid()) { - throw new ParameterValidationRuntimeException("invalid parameters", result); - } - - return this; - } - - /** - * Validates the parameters. - * - * @param resultName name of the result - * - * @return the validation result - */ - public ValidationResult validate(String resultName) { - BeanValidationResult result = new BeanValidator().validateTop(resultName, this); - - if (defaults != null) { - result.addResult(defaults.validate("defaults")); - } - - // @formatter:off - result.validateMap("operation", operation, - (result2, entry) -> result2.validateNotNull(entry.getKey(), entry.getValue())); - // @formatter:on - - return result; - } -} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParams.java deleted file mode 100644 index 33fcf3052..000000000 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParams.java +++ /dev/null @@ -1,68 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.controlloop.actorserviceprovider.parameters; - -import lombok.Builder; -import lombok.Data; -import org.onap.policy.common.parameters.BeanValidator; -import org.onap.policy.common.parameters.ValidationResult; -import org.onap.policy.common.parameters.annotations.Min; -import org.onap.policy.common.parameters.annotations.NotBlank; -import org.onap.policy.common.parameters.annotations.NotNull; - -/** - * Parameters used by Operators that use a pair of Topics, one to publish requests and the - * other to receive responses. - */ -@NotNull -@NotBlank -@Data -@Builder(toBuilder = true) -public class TopicPairParams { - - /** - * Source topic end point, from which to read responses. - */ - private String source; - - /** - * Name of the target topic end point to which requests should be published. - */ - private String target; - - /** - * Amount of time, in seconds to wait for the response. The default is five minutes. - */ - @Min(1) - @Builder.Default - private int timeoutSec = 300; - - /** - * Validates the parameters. - * - * @param resultName name of the result - * - * @return the validation result - */ - public ValidationResult validate(String resultName) { - return new BeanValidator().validateTop(resultName, this); - } -} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicHandler.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicHandler.java new file mode 100644 index 000000000..30ee1e2d0 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicHandler.java @@ -0,0 +1,79 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.topic; + +import java.util.List; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException; + +/** + * Handler for a bidirectional topic, supporting both publishing and forwarding of + * incoming messages. + */ +public class BidirectionalTopicHandler extends BidirectionalTopicClient { + + /** + * Listener that will be attached to the topic to receive responses. + */ + private final TopicListenerImpl listener = new TopicListenerImpl(); + + + /** + * Constructs the object. + * + * @param sinkTopic sink topic name + * @param sourceTopic source topic name + * @throws BidirectionalTopicClientException if an error occurs + */ + public BidirectionalTopicHandler(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException { + super(sinkTopic, sourceTopic); + } + + /** + * Starts listening on the source topic(s). + */ + public void start() { + getSource().register(listener); + } + + /** + * Stops listening on the source topic(s). + */ + public void stop() { + getSource().unregister(listener); + } + + /** + * Stops listening on the source topic(s) and clears all of the forwarders. + */ + public void shutdown() { + stop(); + listener.shutdown(); + } + + public Forwarder addForwarder(SelectorKey... keys) { + return listener.addForwarder(keys); + } + + public Forwarder addForwarder(List keys) { + return listener.addForwarder(keys); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicManager.java new file mode 100644 index 000000000..10411875a --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicManager.java @@ -0,0 +1,37 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.topic; + +/** + * Manages bidirectional topics. + */ +@FunctionalInterface +public interface BidirectionalTopicManager { + + /** + * Gets the topic handler for the given parameters, creating one if it does not exist. + * + * @param sinkTopic sink topic name + * @param sourceTopic source topic name + * @return the topic handler associated with the given sink and source topic names + */ + BidirectionalTopicHandler getTopicHandler(String sinkTopic, String sourceTopic); +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java index 8e9109c9e..2d98b66fc 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java @@ -24,8 +24,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer; +import java.util.function.BiConsumer; import org.onap.policy.common.utils.coder.StandardCoderObject; import org.onap.policy.controlloop.actorserviceprovider.Util; import org.slf4j.Logger; @@ -43,7 +42,7 @@ public class Forwarder { * Maps a set of field values to one or more listeners. */ // @formatter:off - private final Map, Map, String>> + private final Map, Map, String>> values2listeners = new ConcurrentHashMap<>(); // @formatter:on @@ -68,13 +67,13 @@ public class Forwarder { * @param values field values of interest, in one-to-one correspondence with the keys * @param listener listener to register */ - public void register(List values, TriConsumer listener) { + public void register(List values, BiConsumer listener) { if (keys.size() != values.size()) { throw new IllegalArgumentException("key/value mismatch"); } values2listeners.compute(values, (key, listeners) -> { - Map, String> map = listeners; + Map, String> map = listeners; if (map == null) { map = new ConcurrentHashMap<>(); } @@ -90,7 +89,7 @@ public class Forwarder { * @param values field values of interest, in one-to-one correspondence with the keys * @param listener listener to unregister */ - public void unregister(List values, TriConsumer listener) { + public void unregister(List values, BiConsumer listener) { values2listeners.computeIfPresent(values, (key, listeners) -> { listeners.remove(listener); return (listeners.isEmpty() ? null : listeners); @@ -100,11 +99,10 @@ public class Forwarder { /** * Processes a message, forwarding it to the appropriate listeners, if any. * - * @param infra communication infrastructure on which the response was received * @param textMessage original text message that was received * @param scoMessage decoded text message */ - public void onMessage(CommInfrastructure infra, String textMessage, StandardCoderObject scoMessage) { + public void onMessage(String textMessage, StandardCoderObject scoMessage) { // extract the key values from the message List values = new ArrayList<>(keys.size()); for (SelectorKey key : keys) { @@ -121,8 +119,7 @@ public class Forwarder { } // get the listeners for this set of values - Map, String> listeners = - values2listeners.get(values); + Map, String> listeners = values2listeners.get(values); if (listeners == null) { // no listeners for this particular list of values return; @@ -130,9 +127,9 @@ public class Forwarder { // forward the message to each listener - for (TriConsumer listener : listeners.keySet()) { + for (BiConsumer listener : listeners.keySet()) { try { - listener.accept(infra, textMessage, scoMessage); + listener.accept(textMessage, scoMessage); } catch (RuntimeException e) { logger.warn("exception thrown by listener {}", Util.ident(listener), e); } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java index eb805ca5d..fcb463518 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java @@ -98,7 +98,7 @@ public class TopicListenerImpl implements TopicListener { * them all take a crack at it. */ for (Forwarder forwarder : selector2forwarder.values()) { - forwarder.onMessage(infra, message, object); + forwarder.onMessage(message, object); } } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java deleted file mode 100644 index c0cfe2571..000000000 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java +++ /dev/null @@ -1,122 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.controlloop.actorserviceprovider.topic; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import lombok.Getter; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; -import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; -import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.event.comm.TopicSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A pair of topics, one of which is used to publish requests and the other to receive - * responses. - */ -public class TopicPair extends TopicListenerImpl { - private static final Logger logger = LoggerFactory.getLogger(TopicPair.class); - - @Getter - private final String source; - - @Getter - private final String target; - - private final List publishers; - private final List subscribers; - - /** - * Constructs the object. - * - * @param source source topic name - * @param target target topic name - */ - public TopicPair(String source, String target) { - this.source = source; - this.target = target; - - publishers = getTopicEndpointManager().getTopicSinks(target); - if (publishers.isEmpty()) { - throw new IllegalArgumentException("no sinks for topic: " + target); - } - - subscribers = getTopicEndpointManager().getTopicSources(Arrays.asList(source)); - if (subscribers.isEmpty()) { - throw new IllegalArgumentException("no sources for topic: " + source); - } - } - - /** - * Starts listening on the source topic(s). - */ - public void start() { - subscribers.forEach(topic -> topic.register(this)); - } - - /** - * Stops listening on the source topic(s). - */ - public void stop() { - subscribers.forEach(topic -> topic.unregister(this)); - } - - /** - * Stops listening on the source topic(s) and clears all of the forwarders. - */ - @Override - public void shutdown() { - stop(); - super.shutdown(); - } - - /** - * Publishes a message to the target topic. - * - * @param message message to be published - * @return a list of the infrastructures on which it was published - */ - public List publish(String message) { - List infrastructures = new ArrayList<>(publishers.size()); - - for (TopicSink topic : publishers) { - try { - topic.send(message); - infrastructures.add(topic.getTopicCommInfrastructure()); - - } catch (RuntimeException e) { - logger.warn("cannot publish to {}:{}", topic.getTopicCommInfrastructure(), target, e); - } - } - - return infrastructures; - } - - // these may be overridden by junit tests - - protected TopicEndpoint getTopicEndpointManager() { - return TopicEndpointManager.getManager(); - } -} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java deleted file mode 100644 index c351f95f6..000000000 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java +++ /dev/null @@ -1,37 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.controlloop.actorserviceprovider.topic; - -/** - * Manages topic pairs. - */ -@FunctionalInterface -public interface TopicPairManager { - - /** - * Gets the topic pair for the given parameters, creating one if it does not exist. - * - * @param source source topic name - * @param target target topic name - * @return the topic pair associated with the given source and target topics - */ - TopicPair getTopicPair(String source, String target); -} -- cgit