diff options
author | Jim Hahn <jrh3@att.com> | 2020-02-14 14:22:48 -0500 |
---|---|---|
committer | Jim Hahn <jrh3@att.com> | 2020-02-17 11:30:45 -0500 |
commit | 28ca4d196bb0e8c50ad64b5bfde117a93ace3e04 (patch) | |
tree | 3f936b4e622402b710d1003b259057f063adeb7f /models-interactions/model-actors/actorServiceProvider/src/main | |
parent | 170d740e8a1d74875317e86b4266b798ea8baaab (diff) |
Use BidirectionalTopicClient from policy-common
Also modified "target" to sink in various places, and renamed
various uses of "pair" to "bidirectional" (e.g., TopicPairParams
=> BidirectionalTopicParams).
Also replaced MyExec with PseudoExecutor, from policy-common.
As part of this, extracted the logRequest and logResponse methods
from the Http and Topic classes, moving them into the common
OperationPartial class.
Modified A&AI, SDNC junit tests to use PseudoExecutor.
Added support for incomplete responses on Topics, where multiple
responses may be received for one request
Fixed a duplicate entry in actor.aai pom.
As the changes were already big enough, went ahead and also did the
following to support the APPC Actor:
- Reorganized parameter classes and content.
- Modified anyOf, allOf to take functions instead of futures and handle
exceptions thrown by any of the functions. Also added sequence() method.
- Deleted doTask.
- Modified ActorService.config to take a map of maps, not just a map.
- Decided NOT to move anyOf, allOf, and sequence from OperationPartial
to a utility class, because they depend on "params".
Issue-ID: POLICY-2363
Signed-off-by: Jim Hahn <jrh3@att.com>
Change-Id: I5a8bae05dfef22fe71c57c58f265b9dac20df5c5
Diffstat (limited to 'models-interactions/model-actors/actorServiceProvider/src/main')
17 files changed, 692 insertions, 710 deletions
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java index 2886b1feb..24c2cfc23 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java @@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory; * {@link #start()} to start all of the actors. When finished using the actor service, * invoke {@link #stop()} or {@link #shutdown()}. */ -public class ActorService extends StartConfigPartial<Map<String, Object>> { +public class ActorService extends StartConfigPartial<Map<String, Map<String, Object>>> { private static final Logger logger = LoggerFactory.getLogger(ActorService.class); private final Map<String, Actor> name2actor; @@ -116,14 +116,14 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> { } @Override - protected void doConfigure(Map<String, Object> parameters) { + protected void doConfigure(Map<String, Map<String, Object>> parameters) { logger.info("configuring actors"); BeanValidationResult valres = new BeanValidationResult("ActorService", parameters); for (Actor actor : name2actor.values()) { String actorName = actor.getName(); - Map<String, Object> subparams = Util.translateToMap(actorName, parameters.get(actorName)); + Map<String, Object> subparams = parameters.get(actorName); if (subparams != null) { diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActor.java new file mode 100644 index 000000000..1e44a170c --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActor.java @@ -0,0 +1,108 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import org.apache.commons.lang3.tuple.Pair; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicActorParams; +import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler; +import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicManager; + +/** + * Actor that uses a bidirectional topic. The actor's parameters must be a + * {@link BidirectionalTopicActorParams}. + */ +public class BidirectionalTopicActor extends ActorImpl implements BidirectionalTopicManager { + + /** + * Maps a pair of sink and source topic names to their bidirectional topic. + */ + private final Map<Pair<String, String>, BidirectionalTopicHandler> params2topic = new ConcurrentHashMap<>(); + + + /** + * Constructs the object. + * + * @param name actor's name + */ + public BidirectionalTopicActor(String name) { + super(name); + } + + @Override + protected void doStart() { + params2topic.values().forEach(BidirectionalTopicHandler::start); + super.doStart(); + } + + @Override + protected void doStop() { + params2topic.values().forEach(BidirectionalTopicHandler::stop); + super.doStop(); + } + + @Override + protected void doShutdown() { + params2topic.values().forEach(BidirectionalTopicHandler::shutdown); + params2topic.clear(); + super.doShutdown(); + } + + @Override + public BidirectionalTopicHandler getTopicHandler(String sinkTopic, String sourceTopic) { + Pair<String, String> key = Pair.of(sinkTopic, sourceTopic); + + return params2topic.computeIfAbsent(key, pair -> { + try { + return makeTopicHandler(sinkTopic, sourceTopic); + } catch (BidirectionalTopicClientException e) { + throw new IllegalArgumentException(e); + } + }); + } + + /** + * Translates the parameters to a {@link BidirectionalTopicActorParams} and then + * creates a function that will extract operator-specific parameters. + */ + @Override + protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) { + String actorName = getName(); + + // @formatter:off + return Util.translate(actorName, actorParameters, BidirectionalTopicActorParams.class) + .doValidation(actorName) + .makeOperationParameters(actorName); + // @formatter:on + } + + // may be overridden by junit tests + + protected BidirectionalTopicHandler makeTopicHandler(String sinkTopic, String sourceTopic) + throws BidirectionalTopicClientException { + + return new BidirectionalTopicHandler(sinkTopic, sourceTopic); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperation.java index 6b584d7c6..f82015d6b 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperation.java @@ -24,40 +24,42 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import lombok.Getter; -import org.apache.commons.lang3.tuple.Triple; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.utils.NetLoggerUtil; -import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer; -import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; import org.onap.policy.common.utils.coder.CoderException; -import org.onap.policy.common.utils.coder.StandardCoder; import org.onap.policy.common.utils.coder.StandardCoderObject; import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; -import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams; import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; +import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler; import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder; -import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair; import org.onap.policy.controlloop.policy.PolicyResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Operation that uses a Topic pair. + * Operation that uses a bidirectional topic. * * @param <S> response type */ @Getter -public abstract class TopicPairOperation<Q, S> extends OperationPartial { - private static final Logger logger = LoggerFactory.getLogger(TopicPairOperation.class); - private static final Coder coder = new StandardCoder(); +public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial { + private static final Logger logger = LoggerFactory.getLogger(BidirectionalTopicOperation.class); + + /** + * Response status. + */ + public enum Status { + SUCCESS, FAILURE, STILL_WAITING + } // fields extracted from the operator - private final TopicPair topicPair; + private final BidirectionalTopicHandler topicHandler; private final Forwarder forwarder; - private final TopicPairParams pairParams; + private final BidirectionalTopicParams topicParams; private final long timeoutMs; /** @@ -73,13 +75,14 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial { * @param operator operator that created this operation * @param clazz response class */ - public TopicPairOperation(ControlLoopOperationParams params, TopicPairOperator operator, Class<S> clazz) { + public BidirectionalTopicOperation(ControlLoopOperationParams params, BidirectionalTopicOperator operator, + Class<S> clazz) { super(params, operator); - this.topicPair = operator.getTopicPair(); + this.topicHandler = operator.getTopicHandler(); this.forwarder = operator.getForwarder(); - this.pairParams = operator.getParams(); + this.topicParams = operator.getParams(); this.responseClass = clazz; - this.timeoutMs = TimeUnit.MILLISECONDS.convert(pairParams.getTimeoutSec(), TimeUnit.SECONDS); + this.timeoutMs = TimeUnit.MILLISECONDS.convert(topicParams.getTimeoutSec(), TimeUnit.SECONDS); } /** @@ -101,18 +104,17 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial { final List<String> expectedKeyValues = getExpectedKeyValues(attempt, request); final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - final CompletableFuture<Triple<CommInfrastructure, String, StandardCoderObject>> future = - new CompletableFuture<>(); final Executor executor = params.getExecutor(); // register a listener BEFORE publishing - // @formatter:off - TriConsumer<CommInfrastructure, String, StandardCoderObject> listener = - (infra, rawResponse, scoResponse) -> future.complete(Triple.of(infra, rawResponse, scoResponse)); - // @formatter:on - - // TODO this currently only allows a single matching response + BiConsumer<String, StandardCoderObject> listener = (rawResponse, scoResponse) -> { + OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse); + if (latestOutcome != null) { + // final response - complete the controller + controller.completeAsync(() -> latestOutcome, executor); + } + }; forwarder.register(expectedKeyValues, listener); @@ -128,16 +130,6 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial { throw e; } - - // once "future" completes, process the response, and then complete the controller - - // @formatter:off - future.thenApplyAsync( - triple -> processResponse(triple.getLeft(), outcome, triple.getMiddle(), triple.getRight()), - executor) - .whenCompleteAsync(controller.delayedComplete(), executor); - // @formatter:on - return controller; } @@ -175,12 +167,11 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial { throw new IllegalArgumentException("cannot encode request", e); } - List<CommInfrastructure> list = topicPair.publish(json); - if (list.isEmpty()) { + if (!topicHandler.send(json)) { throw new IllegalStateException("nothing published"); } - logTopicRequest(list, request); + logMessage(EventType.OUT, topicHandler.getSinkTopicCommInfrastructure(), topicHandler.getSinkTopic(), request); } /** @@ -190,15 +181,17 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial { * @param outcome outcome to be populated * @param response raw response to process * @param scoResponse response, as a {@link StandardCoderObject} - * @return the outcome + * @return the outcome, or {@code null} if still waiting for completion */ - protected OperationOutcome processResponse(CommInfrastructure infra, OperationOutcome outcome, String rawResponse, + protected OperationOutcome processResponse(OperationOutcome outcome, String rawResponse, StandardCoderObject scoResponse) { logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId()); - logTopicResponse(infra, rawResponse); + logMessage(EventType.IN, topicHandler.getSourceTopicCommInfrastructure(), topicHandler.getSourceTopic(), + rawResponse); + // decode the response S response; if (responseClass == String.class) { response = responseClass.cast(rawResponse); @@ -216,17 +209,26 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial { } } - if (!isSuccess(rawResponse, response)) { - logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(), - params.getRequestId()); - return setOutcome(outcome, PolicyResult.FAILURE); - } + // check its status + switch (detmStatus(rawResponse, response)) { + case SUCCESS: + logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + setOutcome(outcome, PolicyResult.SUCCESS); + postProcessResponse(outcome, rawResponse, response); + return outcome; - logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId()); - setOutcome(outcome, PolicyResult.SUCCESS); - postProcessResponse(outcome, rawResponse, response); + case FAILURE: + logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + return setOutcome(outcome, PolicyResult.FAILURE); - return outcome; + case STILL_WAITING: + default: + logger.info("{}.{} request incomplete for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + return null; + } } /** @@ -241,76 +243,11 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial { } /** - * Determines if the response indicates success. + * Determines the status of the response. * * @param rawResponse raw response * @param response decoded response - * @return {@code true} if the response indicates success, {@code false} otherwise - */ - protected abstract boolean isSuccess(String rawResponse, S response); - - /** - * Logs a TOPIC request. If the request is not of type, String, then it attempts to - * pretty-print it into JSON before logging. - * - * @param infrastructures list of communication infrastructures on which it was - * published - * @param request request to be logged - */ - protected void logTopicRequest(List<CommInfrastructure> infrastructures, Q request) { - if (infrastructures.isEmpty()) { - return; - } - - String json; - try { - if (request == null) { - json = null; - } else if (request instanceof String) { - json = request.toString(); - } else { - json = makeCoder().encode(request, true); - } - - } catch (CoderException e) { - logger.warn("cannot pretty-print request", e); - json = request.toString(); - } - - for (CommInfrastructure infra : infrastructures) { - logger.info("[OUT|{}|{}|]{}{}", infra, pairParams.getTarget(), NetLoggerUtil.SYSTEM_LS, json); - } - } - - /** - * Logs a TOPIC response. If the response is not of type, String, then it attempts to - * pretty-print it into JSON before logging. - * - * @param infra communication infrastructure on which the response was received - * @param response response to be logged + * @return the status of the response */ - protected <T> void logTopicResponse(CommInfrastructure infra, T response) { - String json; - try { - if (response == null) { - json = null; - } else if (response instanceof String) { - json = response.toString(); - } else { - json = makeCoder().encode(response, true); - } - - } catch (CoderException e) { - logger.warn("cannot pretty-print response", e); - json = response.toString(); - } - - logger.info("[IN|{}|{}|]{}{}", infra, pairParams.getSource(), NetLoggerUtil.SYSTEM_LS, json); - } - - // these may be overridden by junit tests - - protected Coder makeCoder() { - return coder; - } + protected abstract Status detmStatus(String rawResponse, S response); } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperator.java index 8ce013388..51689e49b 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperator.java @@ -28,24 +28,24 @@ import lombok.Getter; import org.onap.policy.common.parameters.ValidationResult; import org.onap.policy.controlloop.actorserviceprovider.Operation; import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; -import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams; +import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler; +import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicManager; import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder; import org.onap.policy.controlloop.actorserviceprovider.topic.SelectorKey; -import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair; -import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager; /** - * Operator that uses a pair of topics, one for publishing the request, and another for - * receiving the response. Topic operators may share a {@link TopicPair}. + * Operator that uses a bidirectional topic. Topic operators may share a + * {@link BidirectionalTopicHandler}. */ -public abstract class TopicPairOperator extends OperatorPartial { +public abstract class BidirectionalTopicOperator extends OperatorPartial { /** - * Manager from which to get the topic pair. + * Manager from which to get the topic handlers. */ - private final TopicPairManager pairManager; + private final BidirectionalTopicManager topicManager; /** * Keys used to extract the fields used to select responses for this operator. @@ -62,13 +62,13 @@ public abstract class TopicPairOperator extends OperatorPartial { * will not, thus operations may copy it. */ @Getter - private TopicPairParams params; + private BidirectionalTopicParams params; /** - * Topic pair associated with the parameters. + * Topic handler associated with the parameters. */ @Getter - private TopicPair topicPair; + private BidirectionalTopicHandler topicHandler; /** * Forwarder associated with the parameters. @@ -82,27 +82,27 @@ public abstract class TopicPairOperator extends OperatorPartial { * * @param actorName name of the actor with which this operator is associated * @param name operation name - * @param pairManager manager from which to get the topic pair + * @param topicManager manager from which to get the topic handler * @param selectorKeys keys used to extract the fields used to select responses for * this operator */ - public TopicPairOperator(String actorName, String name, TopicPairManager pairManager, + public BidirectionalTopicOperator(String actorName, String name, BidirectionalTopicManager topicManager, List<SelectorKey> selectorKeys) { super(actorName, name); - this.pairManager = pairManager; + this.topicManager = topicManager; this.selectorKeys = selectorKeys; } @Override protected void doConfigure(Map<String, Object> parameters) { - params = Util.translate(getFullName(), parameters, TopicPairParams.class); + params = Util.translate(getFullName(), parameters, BidirectionalTopicParams.class); ValidationResult result = params.validate(getFullName()); if (!result.isValid()) { throw new ParameterValidationRuntimeException("invalid parameters", result); } - topicPair = pairManager.getTopicPair(params.getSource(), params.getTarget()); - forwarder = topicPair.addForwarder(selectorKeys); + topicHandler = topicManager.getTopicHandler(params.getSinkTopic(), params.getSourceTopic()); + forwarder = topicHandler.addForwarder(selectorKeys); } /** @@ -112,19 +112,21 @@ public abstract class TopicPairOperator extends OperatorPartial { * @param <S> response type * @param actorName actor name * @param operation operation name - * @param pairManager manager from which to get the topic pair + * @param topicManager manager from which to get the topic handler * @param operationMaker function to make an operation * @param keys keys used to extract the fields used to select responses for this * operator * @return a new operator */ // @formatter:off - public static <Q,S> TopicPairOperator makeOperator(String actorName, String operation, TopicPairManager pairManager, - BiFunction<ControlLoopOperationParams, TopicPairOperator, TopicPairOperation<Q,S>> operationMaker, + public static <Q, S> BidirectionalTopicOperator makeOperator(String actorName, String operation, + BidirectionalTopicManager topicManager, + BiFunction<ControlLoopOperationParams, BidirectionalTopicOperator, + BidirectionalTopicOperation<Q, S>> operationMaker, SelectorKey... keys) { // @formatter:off - return makeOperator(actorName, operation, pairManager, Arrays.asList(keys), operationMaker); + return makeOperator(actorName, operation, topicManager, Arrays.asList(keys), operationMaker); } /** @@ -134,19 +136,21 @@ public abstract class TopicPairOperator extends OperatorPartial { * @param <S> response type * @param actorName actor name * @param operation operation name - * @param pairManager manager from which to get the topic pair + * @param topicManager manager from which to get the topic handler * @param keys keys used to extract the fields used to select responses for * this operator * @param operationMaker function to make an operation * @return a new operator */ // @formatter:off - public static <Q,S> TopicPairOperator makeOperator(String actorName, String operation, TopicPairManager pairManager, + public static <Q,S> BidirectionalTopicOperator makeOperator(String actorName, String operation, + BidirectionalTopicManager topicManager, List<SelectorKey> keys, - BiFunction<ControlLoopOperationParams, TopicPairOperator, TopicPairOperation<Q,S>> operationMaker) { + BiFunction<ControlLoopOperationParams, BidirectionalTopicOperator, + BidirectionalTopicOperation<Q,S>> operationMaker) { // @formatter:on - return new TopicPairOperator(actorName, operation, pairManager, keys) { + return new BidirectionalTopicOperator(actorName, operation, topicManager, keys) { @Override public synchronized Operation buildOperation(ControlLoopOperationParams params) { return operationMaker.apply(params, this); diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java index ba75f0be6..f1829d79a 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java @@ -33,9 +33,7 @@ import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.http.client.HttpClient; import org.onap.policy.common.endpoints.utils.NetLoggerUtil; import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; -import org.onap.policy.common.utils.coder.Coder; import org.onap.policy.common.utils.coder.CoderException; -import org.onap.policy.common.utils.coder.StandardCoder; import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams; @@ -52,7 +50,6 @@ import org.slf4j.LoggerFactory; @Getter public abstract class HttpOperation<T> extends OperationPartial { private static final Logger logger = LoggerFactory.getLogger(HttpOperation.class); - private static final Coder coder = new StandardCoder(); /** * Operator that created this operation. @@ -171,7 +168,7 @@ public abstract class HttpOperation<T> extends OperationPartial { String strResponse = HttpClient.getBody(rawResponse, String.class); - logRestResponse(url, strResponse); + logMessage(EventType.IN, CommInfrastructure.REST, url, strResponse); T response; if (responseClass == String.class) { @@ -224,63 +221,10 @@ public abstract class HttpOperation<T> extends OperationPartial { return (rawResponse.getStatus() == 200); } - /** - * Logs a REST request. If the request is not of type, String, then it attempts to - * pretty-print it into JSON before logging. - * - * @param url request URL - * @param request request to be logged - */ - public <Q> void logRestRequest(String url, Q request) { - String json; - try { - if (request == null) { - json = null; - } else if (request instanceof String) { - json = request.toString(); - } else { - json = makeCoder().encode(request, true); - } - - } catch (CoderException e) { - logger.warn("cannot pretty-print request", e); - json = request.toString(); - } - - NetLoggerUtil.log(EventType.OUT, CommInfrastructure.REST, url, json); - logger.info("[OUT|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json); - } - - /** - * Logs a REST response. If the response is not of type, String, then it attempts to - * pretty-print it into JSON before logging. - * - * @param url request URL - * @param response response to be logged - */ - public <S> void logRestResponse(String url, S response) { - String json; - try { - if (response == null) { - json = null; - } else if (response instanceof String) { - json = response.toString(); - } else { - json = makeCoder().encode(response, true); - } - - } catch (CoderException e) { - logger.warn("cannot pretty-print response", e); - json = response.toString(); - } - - NetLoggerUtil.log(EventType.IN, CommInfrastructure.REST, url, json); - logger.info("[IN|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json); - } - - // these may be overridden by junit tests - - protected Coder makeCoder() { - return coder; + @Override + public <Q> String logMessage(EventType direction, CommInfrastructure infra, String sink, Q request) { + String json = super.logMessage(direction, infra, sink, request); + NetLoggerUtil.log(direction, infra, sink, json); + return json; } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java index d00b88bb5..0b3497197 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java @@ -20,7 +20,12 @@ package org.onap.policy.controlloop.actorserviceprovider.impl; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; import java.util.List; +import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; @@ -28,6 +33,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; import org.onap.policy.controlloop.ControlLoopOperation; import org.onap.policy.controlloop.actorserviceprovider.CallbackManager; import org.onap.policy.controlloop.actorserviceprovider.Operation; @@ -53,8 +66,8 @@ import org.slf4j.LoggerFactory; * be done to cancel that particular operation. */ public abstract class OperationPartial implements Operation { - private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class); + private static final Coder coder = new StandardCoder(); public static final long DEFAULT_RETRY_WAIT_MS = 1000L; // values extracted from the operator @@ -470,103 +483,110 @@ public abstract class OperationPartial implements Operation { * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels * any outstanding futures when one completes. * - * @param futures futures for which to wait - * @return a future to cancel or await an outcome. If this future is canceled, then - * all of the futures will be canceled + * @param futureMakers function to make a future. If the function returns + * {@code null}, then no future is created for that function. On the other + * hand, if the function throws an exception, then the previously created + * functions are canceled and the exception is re-thrown + * @return a future to cancel or await an outcome, or {@code null} if no futures were + * created. If this future is canceled, then all of the futures will be + * canceled */ - protected CompletableFuture<OperationOutcome> anyOf(List<CompletableFuture<OperationOutcome>> futures) { - - // convert list to an array - @SuppressWarnings("rawtypes") - CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]); + protected CompletableFuture<OperationOutcome> anyOf( + @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) { - @SuppressWarnings("unchecked") - CompletableFuture<OperationOutcome> result = anyOf(arrFutures); - return result; + return anyOf(Arrays.asList(futureMakers)); } /** - * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any - * outstanding futures when one completes. + * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels + * any outstanding futures when one completes. * - * @param futures futures for which to wait - * @return a future to cancel or await an outcome. If this future is canceled, then - * all of the futures will be canceled + * @param futureMakers function to make a future. If the function returns + * {@code null}, then no future is created for that function. On the other + * hand, if the function throws an exception, then the previously created + * functions are canceled and the exception is re-thrown + * @return a future to cancel or await an outcome, or {@code null} if no futures were + * created. If this future is canceled, then all of the futures will be + * canceled. Similarly, when this future completes, any incomplete futures + * will be canceled */ protected CompletableFuture<OperationOutcome> anyOf( - @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) { + List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) { + + PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + CompletableFuture<OperationOutcome>[] futures = + attachFutures(controller, futureMakers, UnaryOperator.identity()); + + if (futures.length == 0) { + // no futures were started + return null; + } if (futures.length == 1) { return futures[0]; } - final Executor executor = params.getExecutor(); - final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - - attachFutures(controller, futures); - - // @formatter:off - CompletableFuture.anyOf(futures) - .thenApply(object -> (OperationOutcome) object) - .whenCompleteAsync(controller.delayedComplete(), executor); - // @formatter:on + CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome) + .whenCompleteAsync(controller.delayedComplete(), params.getExecutor()); return controller; } /** - * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels - * the futures if returned future is canceled. The future returns the "worst" outcome, - * based on priority (see {@link #detmPriority(OperationOutcome)}). + * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}. * - * @param futures futures for which to wait - * @return a future to cancel or await an outcome. If this future is canceled, then - * all of the futures will be canceled + * @param futureMakers function to make a future. If the function returns + * {@code null}, then no future is created for that function. On the other + * hand, if the function throws an exception, then the previously created + * functions are canceled and the exception is re-thrown + * @return a future to cancel or await an outcome, or {@code null} if no futures were + * created. If this future is canceled, then all of the futures will be + * canceled */ - protected CompletableFuture<OperationOutcome> allOf(List<CompletableFuture<OperationOutcome>> futures) { - - // convert list to an array - @SuppressWarnings("rawtypes") - CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]); + protected CompletableFuture<OperationOutcome> allOf( + @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) { - @SuppressWarnings("unchecked") - CompletableFuture<OperationOutcome> result = allOf(arrFutures); - return result; + return allOf(Arrays.asList(futureMakers)); } /** - * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the - * futures if returned future is canceled. The future returns the "worst" outcome, - * based on priority (see {@link #detmPriority(OperationOutcome)}). + * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}. * - * @param futures futures for which to wait - * @return a future to cancel or await an outcome. If this future is canceled, then - * all of the futures will be canceled + * @param futureMakers function to make a future. If the function returns + * {@code null}, then no future is created for that function. On the other + * hand, if the function throws an exception, then the previously created + * functions are canceled and the exception is re-thrown + * @return a future to cancel or await an outcome, or {@code null} if no futures were + * created. If this future is canceled, then all of the futures will be + * canceled. Similarly, when this future completes, any incomplete futures + * will be canceled */ protected CompletableFuture<OperationOutcome> allOf( - @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) { - - if (futures.length == 1) { - return futures[0]; - } + List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) { + PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - - attachFutures(controller, futures); + Queue<OperationOutcome> outcomes = new LinkedList<>(); - OperationOutcome[] outcomes = new OperationOutcome[futures.length]; + CompletableFuture<OperationOutcome>[] futures = + attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> { + synchronized (outcomes) { + outcomes.add(outcome); + } + return outcome; + })); - @SuppressWarnings("rawtypes") - CompletableFuture[] futures2 = new CompletableFuture[futures.length]; + if (futures.length == 0) { + // no futures were started + return null; + } - // record the outcomes of each future when it completes - for (int count = 0; count < futures2.length; ++count) { - final int count2 = count; - futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2); + if (futures.length == 1) { + return futures[0]; } // @formatter:off - CompletableFuture.allOf(futures2) + CompletableFuture.allOf(futures) .thenApply(unused -> combineOutcomes(outcomes)) .whenCompleteAsync(controller.delayedComplete(), params.getExecutor()); // @formatter:on @@ -575,22 +595,62 @@ public abstract class OperationPartial implements Operation { } /** - * Attaches the given futures to the controller. + * Invokes the functions to create the futures and attaches them to the controller. * * @param controller master controller for all of the futures - * @param futures futures to be attached to the controller - */ - private void attachFutures(PipelineControllerFuture<OperationOutcome> controller, - @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) { + * @param futureMakers futures to be attached to the controller + * @param adorn function that "adorns" the future, possible adding onto its pipeline. + * Returns the adorned future + * @return an array of futures, possibly zero-length. If the array is of size one, + * then that one item should be returned instead of the controller + */ + private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller, + List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers, + UnaryOperator<CompletableFuture<OperationOutcome>> adorn) { + + if (futureMakers.isEmpty()) { + @SuppressWarnings("unchecked") + CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0]; + return result; + } - if (futures.length == 0) { - throw new IllegalArgumentException("empty list of futures"); + // the last, unadorned future that is created + CompletableFuture<OperationOutcome> lastFuture = null; + + List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size()); + + // make each future + for (var maker : futureMakers) { + try { + CompletableFuture<OperationOutcome> future = maker.get(); + if (future == null) { + continue; + } + + // propagate "stop" to the future + controller.add(future); + + futures.add(adorn.apply(future)); + + lastFuture = future; + + } catch (RuntimeException e) { + logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId()); + controller.cancel(false); + throw e; + } } - // attach each task - for (CompletableFuture<OperationOutcome> future : futures) { - controller.add(future); + @SuppressWarnings("unchecked") + CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()]; + + if (result.length == 1) { + // special case - return the unadorned future + result[0] = lastFuture; + return result; } + + return futures.toArray(result); } /** @@ -599,15 +659,13 @@ public abstract class OperationPartial implements Operation { * @param outcomes outcomes to be examined * @return the combined outcome */ - private OperationOutcome combineOutcomes(OperationOutcome[] outcomes) { + private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) { // identify the outcome with the highest priority - OperationOutcome outcome = outcomes[0]; + OperationOutcome outcome = outcomes.remove(); int priority = detmPriority(outcome); - // start with "1", as we've already dealt with "0" - for (int count = 1; count < outcomes.length; ++count) { - OperationOutcome outcome2 = outcomes[count]; + for (OperationOutcome outcome2 : outcomes) { int priority2 = detmPriority(outcome2); if (priority2 > priority) { @@ -656,72 +714,114 @@ public abstract class OperationPartial implements Operation { } /** - * Performs a task, after verifying that the controller is still running. Also checks - * that the previous outcome was successful, if specified. + * Performs a sequence of tasks, stopping if a task fails. A given task's future is + * not created until the previous task completes. The pipeline returns the outcome of + * the last task executed. * - * @param controller overall pipeline controller - * @param checkSuccess {@code true} to check the previous outcome, {@code false} - * otherwise - * @param outcome outcome of the previous task - * @param task task to be performed - * @return the task, if everything checks out. Otherwise, it returns an incomplete - * future and completes the controller instead + * @param futureMakers functions to make the futures + * @return a future to cancel the sequence or await the outcome */ - // @formatter:off - protected CompletableFuture<OperationOutcome> doTask( - PipelineControllerFuture<OperationOutcome> controller, - boolean checkSuccess, OperationOutcome outcome, - CompletableFuture<OperationOutcome> task) { - // @formatter:on + protected CompletableFuture<OperationOutcome> sequence( + @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) { - if (checkSuccess && !isSuccess(outcome)) { - /* - * must complete before canceling so that cancel() doesn't cause controller to - * complete - */ - controller.complete(outcome); - task.cancel(false); - return new CompletableFuture<>(); + return sequence(Arrays.asList(futureMakers)); + } + + /** + * Performs a sequence of tasks, stopping if a task fails. A given task's future is + * not created until the previous task completes. The pipeline returns the outcome of + * the last task executed. + * + * @param futureMakers functions to make the futures + * @return a future to cancel the sequence or await the outcome, or {@code null} if + * there were no tasks to perform + */ + protected CompletableFuture<OperationOutcome> sequence( + List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) { + + Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers); + + CompletableFuture<OperationOutcome> nextTask = getNextTask(queue); + if (nextTask == null) { + // no tasks + return null; + } + + if (queue.isEmpty()) { + // only one task - just return it rather than wrapping it in a controller + return nextTask; } - return controller.wrap(task); + /* + * multiple tasks - need a controller to stop whichever task is currently + * executing + */ + final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + final Executor executor = params.getExecutor(); + + // @formatter:off + controller.wrap(nextTask) + .thenComposeAsync(nextTaskOnSuccess(controller, queue), executor) + .whenCompleteAsync(controller.delayedComplete(), executor); + // @formatter:on + + return controller; } /** - * Performs a task, after verifying that the controller is still running. Also checks - * that the previous outcome was successful, if specified. + * Executes the next task in the queue, if the previous outcome was successful. * - * @param controller overall pipeline controller - * @param checkSuccess {@code true} to check the previous outcome, {@code false} - * otherwise - * @param task function to start the task to be performed - * @return a function to perform the task. If everything checks out, then it returns - * the task. Otherwise, it returns an incomplete future and completes the - * controller instead + * @param controller pipeline controller + * @param taskQueue queue of tasks to be performed + * @return a future to execute the remaining tasks, or the current outcome, if it's a + * failure, or if there are no more tasks */ - // @formatter:off - protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask( + private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess( PipelineControllerFuture<OperationOutcome> controller, - boolean checkSuccess, - Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) { - // @formatter:on + Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) { return outcome -> { - - if (!controller.isRunning()) { - return new CompletableFuture<>(); + if (!isSuccess(outcome)) { + // return the failure + return CompletableFuture.completedFuture(outcome); } - if (checkSuccess && !isSuccess(outcome)) { - controller.complete(outcome); - return new CompletableFuture<>(); + CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue); + if (nextTask == null) { + // no tasks - just return the success + return CompletableFuture.completedFuture(outcome); } - return controller.wrap(task.apply(outcome)); + // @formatter:off + return controller + .wrap(nextTask) + .thenComposeAsync(nextTaskOnSuccess(controller, taskQueue), params.getExecutor()); + // @formatter:on }; } /** + * Gets the next task from the queue, skipping those that are {@code null}. + * + * @param taskQueue task queue + * @return the next task, or {@code null} if the queue is now empty + */ + private CompletableFuture<OperationOutcome> getNextTask( + Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) { + + Supplier<CompletableFuture<OperationOutcome>> maker; + + while ((maker = taskQueue.poll()) != null) { + CompletableFuture<OperationOutcome> future = maker.get(); + if (future != null) { + return future; + } + } + + return null; + } + + /** * Sets the start time of the operation and invokes the callback to indicate that the * operation has started. Does nothing if the pipeline has been stopped. * <p/> @@ -809,6 +909,38 @@ public abstract class OperationPartial implements Operation { return (thrown instanceof TimeoutException); } + /** + * Logs a response. If the response is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param direction IN or OUT + * @param infra communication infrastructure on which it was published + * @param source source name (e.g., the URL or Topic name) + * @param response response to be logged + * @return the JSON text that was logged + */ + public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T response) { + String json; + try { + if (response == null) { + json = null; + } else if (response instanceof String) { + json = response.toString(); + } else { + json = makeCoder().encode(response, true); + } + + } catch (CoderException e) { + String type = (direction == EventType.IN ? "response" : "request"); + logger.warn("cannot pretty-print {}", type, e); + json = response.toString(); + } + + logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json); + + return json; + } + // these may be overridden by subclasses or junit tests /** @@ -841,4 +973,10 @@ public abstract class OperationPartial implements Operation { protected long getTimeoutMs(Integer timeoutSec) { return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS)); } + + // these may be overridden by junit tests + + protected Coder makeCoder() { + return coder; + } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java deleted file mode 100644 index c3e1e5c4d..000000000 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java +++ /dev/null @@ -1,112 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.controlloop.actorserviceprovider.impl; - -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Function; -import org.apache.commons.lang3.tuple.Pair; -import org.onap.policy.common.parameters.ValidationResult; -import org.onap.policy.controlloop.actorserviceprovider.Util; -import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; -import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairActorParams; -import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair; -import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager; - -/** - * Actor that uses a topic pair. The actor's parameters must be a - * {@link TopicPairActorParams}. - */ -public class TopicPairActor extends ActorImpl implements TopicPairManager { - - /** - * Maps a topic source and target name to its topic pair. - */ - private final Map<Pair<String, String>, TopicPair> params2topic = new ConcurrentHashMap<>(); - - - /** - * Constructs the object. - * - * @param name actor's name - */ - public TopicPairActor(String name) { - super(name); - } - - @Override - protected void doStart() { - params2topic.values().forEach(TopicPair::start); - super.doStart(); - } - - @Override - protected void doStop() { - params2topic.values().forEach(TopicPair::stop); - super.doStop(); - } - - @Override - protected void doShutdown() { - params2topic.values().forEach(TopicPair::shutdown); - params2topic.clear(); - super.doShutdown(); - } - - @Override - public TopicPair getTopicPair(String source, String target) { - Pair<String, String> key = Pair.of(source, target); - return params2topic.computeIfAbsent(key, pair -> new TopicPair(source, target)); - } - - /** - * Translates the parameters to a {@link TopicPairActorParams} and then creates a - * function that will extract operator-specific parameters. - */ - @Override - protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) { - String actorName = getName(); - - TopicPairActorParams params = Util.translate(actorName, actorParameters, TopicPairActorParams.class); - ValidationResult result = params.validate(getName()); - if (!result.isValid()) { - throw new ParameterValidationRuntimeException("invalid parameters", result); - } - - // create a map of the default parameters - Map<String, Object> defaultParams = Util.translateToMap(getName(), params.getDefaults()); - Map<String, Map<String, Object>> operations = params.getOperation(); - - return operationName -> { - Map<String, Object> specificParams = operations.get(operationName); - if (specificParams == null) { - return null; - } - - // start with a copy of defaults and overlay with specific - Map<String, Object> subparams = new TreeMap<>(defaultParams); - subparams.putAll(specificParams); - - return Util.translateToMap(getName() + "." + operationName, subparams); - }; - } -} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParams.java new file mode 100644 index 000000000..291aeeb23 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParams.java @@ -0,0 +1,57 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.parameters; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.common.parameters.annotations.Min; + +/** + * Parameters used by Actors whose Operators use bidirectional topic. + */ +@Getter +@Setter +@EqualsAndHashCode(callSuper = true) +public class BidirectionalTopicActorParams extends CommonActorParams { + + /* + * Optional, default values that are used if missing from the operation-specific + * parameters. + */ + + /** + * Sink topic name to which requests should be published. + */ + private String sinkTopic; + + /** + * Source topic name, from which to read responses. + */ + private String sourceTopic; + + /** + * Amount of time, in seconds, to wait for the HTTP request to complete. The default + * is 90 seconds. + */ + @Min(1) + private int timeoutSec = 90; +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicParams.java index 33fcf3052..cafca1fa6 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParams.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicParams.java @@ -29,31 +29,33 @@ import org.onap.policy.common.parameters.annotations.NotBlank; import org.onap.policy.common.parameters.annotations.NotNull; /** - * Parameters used by Operators that use a pair of Topics, one to publish requests and the - * other to receive responses. + * Parameters used by Operators that use a bidirectional topic. */ @NotNull @NotBlank @Data @Builder(toBuilder = true) -public class TopicPairParams { +public class BidirectionalTopicParams { /** - * Source topic end point, from which to read responses. + * Sink topic name to which requests should be published. */ - private String source; + private String sinkTopic; /** - * Name of the target topic end point to which requests should be published. + * Source topic name, from which to read responses. */ - private String target; + private String sourceTopic; /** - * Amount of time, in seconds to wait for the response. The default is five minutes. + * Amount of time, in seconds to wait for the response. + * <p/> + * Note: this should NOT have a default value, as it receives its default value from + * {@link BidirectionalTopicActorParams}. */ @Min(1) - @Builder.Default - private int timeoutSec = 300; + private int timeoutSec; + /** * Validates the parameters. diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/CommonActorParams.java index 42a44ee9c..dc6f2b657 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/CommonActorParams.java @@ -21,37 +21,57 @@ package org.onap.policy.controlloop.actorserviceprovider.parameters; import java.util.Map; -import lombok.Builder; -import lombok.Data; -import org.onap.policy.common.parameters.BeanValidationResult; +import java.util.TreeMap; +import java.util.function.Function; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; import org.onap.policy.common.parameters.BeanValidator; import org.onap.policy.common.parameters.ValidationResult; -import org.onap.policy.common.parameters.annotations.NotBlank; import org.onap.policy.common.parameters.annotations.NotNull; +import org.onap.policy.controlloop.actorserviceprovider.Util; /** - * Parameters used by Actors whose Operators use a pair of Topics, one to publish requests - * and the other to receive responses. + * Superclass for Actor parameters that have default values in "this" object, and + * operation-specific values in {@link #operation}. */ -@NotNull -@NotBlank -@Data -@Builder -public class TopicPairActorParams { +@Getter +@Setter +@EqualsAndHashCode +public class CommonActorParams { /** - * This contains the default parameters that are used when an operation doesn't - * specify them. Note: each operation to be used must still have an entry in - * {@link #operation}, even if it's empty. Otherwise, the given operation will not be - * started. + * Maps the operation name to its parameters. */ - private TopicPairParams defaults; + @NotNull + protected Map<String, Map<String, Object>> operation; + /** - * Maps an operation name to its individual parameters. + * Extracts a specific operation's parameters from "this". + * + * @param name name of the item containing "this" + * @return a function to extract an operation's parameters from "this". Note: the + * returned function is not thread-safe */ - private Map<String, Map<String, Object>> operation; + public Function<String, Map<String, Object>> makeOperationParameters(String name) { + + Map<String, Object> defaultParams = Util.translateToMap(name, this); + defaultParams.remove("operation"); + + return operationName -> { + Map<String, Object> specificParams = operation.get(operationName); + if (specificParams == null) { + return null; + } + + // start with a copy of defaults and overlay with specific + Map<String, Object> subparams = new TreeMap<>(defaultParams); + subparams.putAll(specificParams); + return Util.translateToMap(name + "." + operationName, subparams); + }; + } /** * Validates the parameters. @@ -60,7 +80,7 @@ public class TopicPairActorParams { * @return "this" * @throws IllegalArgumentException if the parameters are invalid */ - public TopicPairActorParams doValidation(String name) { + public CommonActorParams doValidation(String name) { ValidationResult result = validate(name); if (!result.isValid()) { throw new ParameterValidationRuntimeException("invalid parameters", result); @@ -77,17 +97,6 @@ public class TopicPairActorParams { * @return the validation result */ public ValidationResult validate(String resultName) { - BeanValidationResult result = new BeanValidator().validateTop(resultName, this); - - if (defaults != null) { - result.addResult(defaults.validate("defaults")); - } - - // @formatter:off - result.validateMap("operation", operation, - (result2, entry) -> result2.validateNotNull(entry.getKey(), entry.getValue())); - // @formatter:on - - return result; + return new BeanValidator().validateTop(resultName, this); } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java index 275c8bc4e..d589e1d7e 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java @@ -20,26 +20,23 @@ package org.onap.policy.controlloop.actorserviceprovider.parameters; -import java.util.Map; -import java.util.function.Function; -import lombok.Data; -import org.onap.policy.common.parameters.BeanValidationResult; -import org.onap.policy.common.parameters.BeanValidator; -import org.onap.policy.common.parameters.ValidationResult; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; import org.onap.policy.common.parameters.annotations.Min; -import org.onap.policy.common.parameters.annotations.NotBlank; -import org.onap.policy.common.parameters.annotations.NotNull; -import org.onap.policy.controlloop.actorserviceprovider.Util; /** - * Parameters used by Actors that connect to a server via HTTP. This contains the - * parameters that are common to all of the operations. Only the path changes for each - * operation, thus it includes a mapping from operation name to path. + * Parameters used by Actors that connect to a server via HTTP. */ -@Data -@NotNull -@NotBlank -public class HttpActorParams { +@Getter +@Setter +@EqualsAndHashCode(callSuper = true) +public class HttpActorParams extends CommonActorParams { + + /* + * Optional, default values that are used if missing from the operation-specific + * parameters. + */ /** * Name of the HttpClient, as found in the HttpClientFactory. @@ -47,66 +44,9 @@ public class HttpActorParams { private String clientName; /** - * Amount of time, in seconds to wait for the HTTP request to complete, where zero - * indicates that it should wait forever. The default is zero. - */ - @Min(0) - private int timeoutSec = 0; - - /** - * Maps the operation name to its URI path. - */ - private Map<String, String> path; - - /** - * Extracts a specific operation's parameters from "this". - * - * @param name name of the item containing "this" - * @return a function to extract an operation's parameters from "this". Note: the - * returned function is not thread-safe - */ - public Function<String, Map<String, Object>> makeOperationParameters(String name) { - HttpParams subparams = HttpParams.builder().clientName(getClientName()).timeoutSec(getTimeoutSec()).build(); - - return operation -> { - String subpath = path.get(operation); - if (subpath == null) { - return null; - } - - subparams.setPath(subpath); - return Util.translateToMap(name + "." + operation, subparams); - }; - } - - /** - * Validates the parameters. - * - * @param name name of the object containing these parameters - * @return "this" - * @throws IllegalArgumentException if the parameters are invalid + * Amount of time, in seconds, to wait for the HTTP request to complete. The default + * is 90 seconds. */ - public HttpActorParams doValidation(String name) { - ValidationResult result = validate(name); - if (!result.isValid()) { - throw new ParameterValidationRuntimeException("invalid parameters", result); - } - - return this; - } - - /** - * Validates the parameters. - * - * @param resultName name of the result - * - * @return the validation result - */ - public ValidationResult validate(String resultName) { - BeanValidationResult result = new BeanValidator().validateTop(resultName, this); - - result.validateMap("path", path, (result2, entry) -> result2.validateNotNull(entry.getKey(), entry.getValue())); - - return result; - } + @Min(1) + private int timeoutSec = 90; } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java index 93711c032..2d3ab8b54 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java @@ -48,12 +48,13 @@ public class HttpParams { private String path; /** - * Amount of time, in seconds to wait for the HTTP request to complete, where zero - * indicates that it should wait forever. The default is zero. + * Amount of time, in seconds, to wait for the HTTP request to complete. + * <p/> + * Note: this should NOT have a default value, as it receives its default value from + * {@link HttpActorParams}. */ - @Min(0) - @Builder.Default - private int timeoutSec = 0; + @Min(1) + private int timeoutSec; /** diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicHandler.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicHandler.java new file mode 100644 index 000000000..30ee1e2d0 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicHandler.java @@ -0,0 +1,79 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.topic; + +import java.util.List; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException; + +/** + * Handler for a bidirectional topic, supporting both publishing and forwarding of + * incoming messages. + */ +public class BidirectionalTopicHandler extends BidirectionalTopicClient { + + /** + * Listener that will be attached to the topic to receive responses. + */ + private final TopicListenerImpl listener = new TopicListenerImpl(); + + + /** + * Constructs the object. + * + * @param sinkTopic sink topic name + * @param sourceTopic source topic name + * @throws BidirectionalTopicClientException if an error occurs + */ + public BidirectionalTopicHandler(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException { + super(sinkTopic, sourceTopic); + } + + /** + * Starts listening on the source topic(s). + */ + public void start() { + getSource().register(listener); + } + + /** + * Stops listening on the source topic(s). + */ + public void stop() { + getSource().unregister(listener); + } + + /** + * Stops listening on the source topic(s) and clears all of the forwarders. + */ + public void shutdown() { + stop(); + listener.shutdown(); + } + + public Forwarder addForwarder(SelectorKey... keys) { + return listener.addForwarder(keys); + } + + public Forwarder addForwarder(List<SelectorKey> keys) { + return listener.addForwarder(keys); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicManager.java index c351f95f6..10411875a 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicManager.java @@ -21,17 +21,17 @@ package org.onap.policy.controlloop.actorserviceprovider.topic; /** - * Manages topic pairs. + * Manages bidirectional topics. */ @FunctionalInterface -public interface TopicPairManager { +public interface BidirectionalTopicManager { /** - * Gets the topic pair for the given parameters, creating one if it does not exist. + * Gets the topic handler for the given parameters, creating one if it does not exist. * - * @param source source topic name - * @param target target topic name - * @return the topic pair associated with the given source and target topics + * @param sinkTopic sink topic name + * @param sourceTopic source topic name + * @return the topic handler associated with the given sink and source topic names */ - TopicPair getTopicPair(String source, String target); + BidirectionalTopicHandler getTopicHandler(String sinkTopic, String sourceTopic); } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java index 8e9109c9e..2d98b66fc 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java @@ -24,8 +24,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer; +import java.util.function.BiConsumer; import org.onap.policy.common.utils.coder.StandardCoderObject; import org.onap.policy.controlloop.actorserviceprovider.Util; import org.slf4j.Logger; @@ -43,7 +42,7 @@ public class Forwarder { * Maps a set of field values to one or more listeners. */ // @formatter:off - private final Map<List<String>, Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String>> + private final Map<List<String>, Map<BiConsumer<String, StandardCoderObject>, String>> values2listeners = new ConcurrentHashMap<>(); // @formatter:on @@ -68,13 +67,13 @@ public class Forwarder { * @param values field values of interest, in one-to-one correspondence with the keys * @param listener listener to register */ - public void register(List<String> values, TriConsumer<CommInfrastructure, String, StandardCoderObject> listener) { + public void register(List<String> values, BiConsumer<String, StandardCoderObject> listener) { if (keys.size() != values.size()) { throw new IllegalArgumentException("key/value mismatch"); } values2listeners.compute(values, (key, listeners) -> { - Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String> map = listeners; + Map<BiConsumer<String, StandardCoderObject>, String> map = listeners; if (map == null) { map = new ConcurrentHashMap<>(); } @@ -90,7 +89,7 @@ public class Forwarder { * @param values field values of interest, in one-to-one correspondence with the keys * @param listener listener to unregister */ - public void unregister(List<String> values, TriConsumer<CommInfrastructure, String, StandardCoderObject> listener) { + public void unregister(List<String> values, BiConsumer<String, StandardCoderObject> listener) { values2listeners.computeIfPresent(values, (key, listeners) -> { listeners.remove(listener); return (listeners.isEmpty() ? null : listeners); @@ -100,11 +99,10 @@ public class Forwarder { /** * Processes a message, forwarding it to the appropriate listeners, if any. * - * @param infra communication infrastructure on which the response was received * @param textMessage original text message that was received * @param scoMessage decoded text message */ - public void onMessage(CommInfrastructure infra, String textMessage, StandardCoderObject scoMessage) { + public void onMessage(String textMessage, StandardCoderObject scoMessage) { // extract the key values from the message List<String> values = new ArrayList<>(keys.size()); for (SelectorKey key : keys) { @@ -121,8 +119,7 @@ public class Forwarder { } // get the listeners for this set of values - Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String> listeners = - values2listeners.get(values); + Map<BiConsumer<String, StandardCoderObject>, String> listeners = values2listeners.get(values); if (listeners == null) { // no listeners for this particular list of values return; @@ -130,9 +127,9 @@ public class Forwarder { // forward the message to each listener - for (TriConsumer<CommInfrastructure, String, StandardCoderObject> listener : listeners.keySet()) { + for (BiConsumer<String, StandardCoderObject> listener : listeners.keySet()) { try { - listener.accept(infra, textMessage, scoMessage); + listener.accept(textMessage, scoMessage); } catch (RuntimeException e) { logger.warn("exception thrown by listener {}", Util.ident(listener), e); } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java index eb805ca5d..fcb463518 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java @@ -98,7 +98,7 @@ public class TopicListenerImpl implements TopicListener { * them all take a crack at it. */ for (Forwarder forwarder : selector2forwarder.values()) { - forwarder.onMessage(infra, message, object); + forwarder.onMessage(message, object); } } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java deleted file mode 100644 index c0cfe2571..000000000 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java +++ /dev/null @@ -1,122 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.controlloop.actorserviceprovider.topic; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import lombok.Getter; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; -import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; -import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.event.comm.TopicSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A pair of topics, one of which is used to publish requests and the other to receive - * responses. - */ -public class TopicPair extends TopicListenerImpl { - private static final Logger logger = LoggerFactory.getLogger(TopicPair.class); - - @Getter - private final String source; - - @Getter - private final String target; - - private final List<TopicSink> publishers; - private final List<TopicSource> subscribers; - - /** - * Constructs the object. - * - * @param source source topic name - * @param target target topic name - */ - public TopicPair(String source, String target) { - this.source = source; - this.target = target; - - publishers = getTopicEndpointManager().getTopicSinks(target); - if (publishers.isEmpty()) { - throw new IllegalArgumentException("no sinks for topic: " + target); - } - - subscribers = getTopicEndpointManager().getTopicSources(Arrays.asList(source)); - if (subscribers.isEmpty()) { - throw new IllegalArgumentException("no sources for topic: " + source); - } - } - - /** - * Starts listening on the source topic(s). - */ - public void start() { - subscribers.forEach(topic -> topic.register(this)); - } - - /** - * Stops listening on the source topic(s). - */ - public void stop() { - subscribers.forEach(topic -> topic.unregister(this)); - } - - /** - * Stops listening on the source topic(s) and clears all of the forwarders. - */ - @Override - public void shutdown() { - stop(); - super.shutdown(); - } - - /** - * Publishes a message to the target topic. - * - * @param message message to be published - * @return a list of the infrastructures on which it was published - */ - public List<CommInfrastructure> publish(String message) { - List<CommInfrastructure> infrastructures = new ArrayList<>(publishers.size()); - - for (TopicSink topic : publishers) { - try { - topic.send(message); - infrastructures.add(topic.getTopicCommInfrastructure()); - - } catch (RuntimeException e) { - logger.warn("cannot publish to {}:{}", topic.getTopicCommInfrastructure(), target, e); - } - } - - return infrastructures; - } - - // these may be overridden by junit tests - - protected TopicEndpoint getTopicEndpointManager() { - return TopicEndpointManager.getManager(); - } -} |