summaryrefslogtreecommitdiffstats
path: root/models-interactions/model-actors/actorServiceProvider/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'models-interactions/model-actors/actorServiceProvider/src/main')
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java6
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActor.java108
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperation.java (renamed from models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java)175
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperator.java (renamed from models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java)54
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java68
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java386
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java112
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParams.java57
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicParams.java (renamed from models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParams.java)22
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/CommonActorParams.java (renamed from models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java)71
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java94
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java11
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicHandler.java79
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicManager.java (renamed from models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java)14
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java21
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java2
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java122
17 files changed, 692 insertions, 710 deletions
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java
index 2886b1feb..24c2cfc23 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
* {@link #start()} to start all of the actors. When finished using the actor service,
* invoke {@link #stop()} or {@link #shutdown()}.
*/
-public class ActorService extends StartConfigPartial<Map<String, Object>> {
+public class ActorService extends StartConfigPartial<Map<String, Map<String, Object>>> {
private static final Logger logger = LoggerFactory.getLogger(ActorService.class);
private final Map<String, Actor> name2actor;
@@ -116,14 +116,14 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> {
}
@Override
- protected void doConfigure(Map<String, Object> parameters) {
+ protected void doConfigure(Map<String, Map<String, Object>> parameters) {
logger.info("configuring actors");
BeanValidationResult valres = new BeanValidationResult("ActorService", parameters);
for (Actor actor : name2actor.values()) {
String actorName = actor.getName();
- Map<String, Object> subparams = Util.translateToMap(actorName, parameters.get(actorName));
+ Map<String, Object> subparams = parameters.get(actorName);
if (subparams != null) {
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActor.java
new file mode 100644
index 000000000..1e44a170c
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActor.java
@@ -0,0 +1,108 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicActorParams;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicManager;
+
+/**
+ * Actor that uses a bidirectional topic. The actor's parameters must be a
+ * {@link BidirectionalTopicActorParams}.
+ */
+public class BidirectionalTopicActor extends ActorImpl implements BidirectionalTopicManager {
+
+ /**
+ * Maps a pair of sink and source topic names to their bidirectional topic.
+ */
+ private final Map<Pair<String, String>, BidirectionalTopicHandler> params2topic = new ConcurrentHashMap<>();
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param name actor's name
+ */
+ public BidirectionalTopicActor(String name) {
+ super(name);
+ }
+
+ @Override
+ protected void doStart() {
+ params2topic.values().forEach(BidirectionalTopicHandler::start);
+ super.doStart();
+ }
+
+ @Override
+ protected void doStop() {
+ params2topic.values().forEach(BidirectionalTopicHandler::stop);
+ super.doStop();
+ }
+
+ @Override
+ protected void doShutdown() {
+ params2topic.values().forEach(BidirectionalTopicHandler::shutdown);
+ params2topic.clear();
+ super.doShutdown();
+ }
+
+ @Override
+ public BidirectionalTopicHandler getTopicHandler(String sinkTopic, String sourceTopic) {
+ Pair<String, String> key = Pair.of(sinkTopic, sourceTopic);
+
+ return params2topic.computeIfAbsent(key, pair -> {
+ try {
+ return makeTopicHandler(sinkTopic, sourceTopic);
+ } catch (BidirectionalTopicClientException e) {
+ throw new IllegalArgumentException(e);
+ }
+ });
+ }
+
+ /**
+ * Translates the parameters to a {@link BidirectionalTopicActorParams} and then
+ * creates a function that will extract operator-specific parameters.
+ */
+ @Override
+ protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) {
+ String actorName = getName();
+
+ // @formatter:off
+ return Util.translate(actorName, actorParameters, BidirectionalTopicActorParams.class)
+ .doValidation(actorName)
+ .makeOperationParameters(actorName);
+ // @formatter:on
+ }
+
+ // may be overridden by junit tests
+
+ protected BidirectionalTopicHandler makeTopicHandler(String sinkTopic, String sourceTopic)
+ throws BidirectionalTopicClientException {
+
+ return new BidirectionalTopicHandler(sinkTopic, sourceTopic);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperation.java
index 6b584d7c6..f82015d6b 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperation.java
@@ -24,40 +24,42 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
import lombok.Getter;
-import org.apache.commons.lang3.tuple.Triple;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
-import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer;
-import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
import org.onap.policy.common.utils.coder.CoderException;
-import org.onap.policy.common.utils.coder.StandardCoder;
import org.onap.policy.common.utils.coder.StandardCoderObject;
import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams;
import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
-import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
import org.onap.policy.controlloop.policy.PolicyResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Operation that uses a Topic pair.
+ * Operation that uses a bidirectional topic.
*
* @param <S> response type
*/
@Getter
-public abstract class TopicPairOperation<Q, S> extends OperationPartial {
- private static final Logger logger = LoggerFactory.getLogger(TopicPairOperation.class);
- private static final Coder coder = new StandardCoder();
+public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial {
+ private static final Logger logger = LoggerFactory.getLogger(BidirectionalTopicOperation.class);
+
+ /**
+ * Response status.
+ */
+ public enum Status {
+ SUCCESS, FAILURE, STILL_WAITING
+ }
// fields extracted from the operator
- private final TopicPair topicPair;
+ private final BidirectionalTopicHandler topicHandler;
private final Forwarder forwarder;
- private final TopicPairParams pairParams;
+ private final BidirectionalTopicParams topicParams;
private final long timeoutMs;
/**
@@ -73,13 +75,14 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial {
* @param operator operator that created this operation
* @param clazz response class
*/
- public TopicPairOperation(ControlLoopOperationParams params, TopicPairOperator operator, Class<S> clazz) {
+ public BidirectionalTopicOperation(ControlLoopOperationParams params, BidirectionalTopicOperator operator,
+ Class<S> clazz) {
super(params, operator);
- this.topicPair = operator.getTopicPair();
+ this.topicHandler = operator.getTopicHandler();
this.forwarder = operator.getForwarder();
- this.pairParams = operator.getParams();
+ this.topicParams = operator.getParams();
this.responseClass = clazz;
- this.timeoutMs = TimeUnit.MILLISECONDS.convert(pairParams.getTimeoutSec(), TimeUnit.SECONDS);
+ this.timeoutMs = TimeUnit.MILLISECONDS.convert(topicParams.getTimeoutSec(), TimeUnit.SECONDS);
}
/**
@@ -101,18 +104,17 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial {
final List<String> expectedKeyValues = getExpectedKeyValues(attempt, request);
final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
- final CompletableFuture<Triple<CommInfrastructure, String, StandardCoderObject>> future =
- new CompletableFuture<>();
final Executor executor = params.getExecutor();
// register a listener BEFORE publishing
- // @formatter:off
- TriConsumer<CommInfrastructure, String, StandardCoderObject> listener =
- (infra, rawResponse, scoResponse) -> future.complete(Triple.of(infra, rawResponse, scoResponse));
- // @formatter:on
-
- // TODO this currently only allows a single matching response
+ BiConsumer<String, StandardCoderObject> listener = (rawResponse, scoResponse) -> {
+ OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse);
+ if (latestOutcome != null) {
+ // final response - complete the controller
+ controller.completeAsync(() -> latestOutcome, executor);
+ }
+ };
forwarder.register(expectedKeyValues, listener);
@@ -128,16 +130,6 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial {
throw e;
}
-
- // once "future" completes, process the response, and then complete the controller
-
- // @formatter:off
- future.thenApplyAsync(
- triple -> processResponse(triple.getLeft(), outcome, triple.getMiddle(), triple.getRight()),
- executor)
- .whenCompleteAsync(controller.delayedComplete(), executor);
- // @formatter:on
-
return controller;
}
@@ -175,12 +167,11 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial {
throw new IllegalArgumentException("cannot encode request", e);
}
- List<CommInfrastructure> list = topicPair.publish(json);
- if (list.isEmpty()) {
+ if (!topicHandler.send(json)) {
throw new IllegalStateException("nothing published");
}
- logTopicRequest(list, request);
+ logMessage(EventType.OUT, topicHandler.getSinkTopicCommInfrastructure(), topicHandler.getSinkTopic(), request);
}
/**
@@ -190,15 +181,17 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial {
* @param outcome outcome to be populated
* @param response raw response to process
* @param scoResponse response, as a {@link StandardCoderObject}
- * @return the outcome
+ * @return the outcome, or {@code null} if still waiting for completion
*/
- protected OperationOutcome processResponse(CommInfrastructure infra, OperationOutcome outcome, String rawResponse,
+ protected OperationOutcome processResponse(OperationOutcome outcome, String rawResponse,
StandardCoderObject scoResponse) {
logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId());
- logTopicResponse(infra, rawResponse);
+ logMessage(EventType.IN, topicHandler.getSourceTopicCommInfrastructure(), topicHandler.getSourceTopic(),
+ rawResponse);
+ // decode the response
S response;
if (responseClass == String.class) {
response = responseClass.cast(rawResponse);
@@ -216,17 +209,26 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial {
}
}
- if (!isSuccess(rawResponse, response)) {
- logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(),
- params.getRequestId());
- return setOutcome(outcome, PolicyResult.FAILURE);
- }
+ // check its status
+ switch (detmStatus(rawResponse, response)) {
+ case SUCCESS:
+ logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(),
+ params.getRequestId());
+ setOutcome(outcome, PolicyResult.SUCCESS);
+ postProcessResponse(outcome, rawResponse, response);
+ return outcome;
- logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId());
- setOutcome(outcome, PolicyResult.SUCCESS);
- postProcessResponse(outcome, rawResponse, response);
+ case FAILURE:
+ logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(),
+ params.getRequestId());
+ return setOutcome(outcome, PolicyResult.FAILURE);
- return outcome;
+ case STILL_WAITING:
+ default:
+ logger.info("{}.{} request incomplete for {}", params.getActor(), params.getOperation(),
+ params.getRequestId());
+ return null;
+ }
}
/**
@@ -241,76 +243,11 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial {
}
/**
- * Determines if the response indicates success.
+ * Determines the status of the response.
*
* @param rawResponse raw response
* @param response decoded response
- * @return {@code true} if the response indicates success, {@code false} otherwise
- */
- protected abstract boolean isSuccess(String rawResponse, S response);
-
- /**
- * Logs a TOPIC request. If the request is not of type, String, then it attempts to
- * pretty-print it into JSON before logging.
- *
- * @param infrastructures list of communication infrastructures on which it was
- * published
- * @param request request to be logged
- */
- protected void logTopicRequest(List<CommInfrastructure> infrastructures, Q request) {
- if (infrastructures.isEmpty()) {
- return;
- }
-
- String json;
- try {
- if (request == null) {
- json = null;
- } else if (request instanceof String) {
- json = request.toString();
- } else {
- json = makeCoder().encode(request, true);
- }
-
- } catch (CoderException e) {
- logger.warn("cannot pretty-print request", e);
- json = request.toString();
- }
-
- for (CommInfrastructure infra : infrastructures) {
- logger.info("[OUT|{}|{}|]{}{}", infra, pairParams.getTarget(), NetLoggerUtil.SYSTEM_LS, json);
- }
- }
-
- /**
- * Logs a TOPIC response. If the response is not of type, String, then it attempts to
- * pretty-print it into JSON before logging.
- *
- * @param infra communication infrastructure on which the response was received
- * @param response response to be logged
+ * @return the status of the response
*/
- protected <T> void logTopicResponse(CommInfrastructure infra, T response) {
- String json;
- try {
- if (response == null) {
- json = null;
- } else if (response instanceof String) {
- json = response.toString();
- } else {
- json = makeCoder().encode(response, true);
- }
-
- } catch (CoderException e) {
- logger.warn("cannot pretty-print response", e);
- json = response.toString();
- }
-
- logger.info("[IN|{}|{}|]{}{}", infra, pairParams.getSource(), NetLoggerUtil.SYSTEM_LS, json);
- }
-
- // these may be overridden by junit tests
-
- protected Coder makeCoder() {
- return coder;
- }
+ protected abstract Status detmStatus(String rawResponse, S response);
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperator.java
index 8ce013388..51689e49b 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperator.java
@@ -28,24 +28,24 @@ import lombok.Getter;
import org.onap.policy.common.parameters.ValidationResult;
import org.onap.policy.controlloop.actorserviceprovider.Operation;
import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicManager;
import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
import org.onap.policy.controlloop.actorserviceprovider.topic.SelectorKey;
-import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
-import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager;
/**
- * Operator that uses a pair of topics, one for publishing the request, and another for
- * receiving the response. Topic operators may share a {@link TopicPair}.
+ * Operator that uses a bidirectional topic. Topic operators may share a
+ * {@link BidirectionalTopicHandler}.
*/
-public abstract class TopicPairOperator extends OperatorPartial {
+public abstract class BidirectionalTopicOperator extends OperatorPartial {
/**
- * Manager from which to get the topic pair.
+ * Manager from which to get the topic handlers.
*/
- private final TopicPairManager pairManager;
+ private final BidirectionalTopicManager topicManager;
/**
* Keys used to extract the fields used to select responses for this operator.
@@ -62,13 +62,13 @@ public abstract class TopicPairOperator extends OperatorPartial {
* will not, thus operations may copy it.
*/
@Getter
- private TopicPairParams params;
+ private BidirectionalTopicParams params;
/**
- * Topic pair associated with the parameters.
+ * Topic handler associated with the parameters.
*/
@Getter
- private TopicPair topicPair;
+ private BidirectionalTopicHandler topicHandler;
/**
* Forwarder associated with the parameters.
@@ -82,27 +82,27 @@ public abstract class TopicPairOperator extends OperatorPartial {
*
* @param actorName name of the actor with which this operator is associated
* @param name operation name
- * @param pairManager manager from which to get the topic pair
+ * @param topicManager manager from which to get the topic handler
* @param selectorKeys keys used to extract the fields used to select responses for
* this operator
*/
- public TopicPairOperator(String actorName, String name, TopicPairManager pairManager,
+ public BidirectionalTopicOperator(String actorName, String name, BidirectionalTopicManager topicManager,
List<SelectorKey> selectorKeys) {
super(actorName, name);
- this.pairManager = pairManager;
+ this.topicManager = topicManager;
this.selectorKeys = selectorKeys;
}
@Override
protected void doConfigure(Map<String, Object> parameters) {
- params = Util.translate(getFullName(), parameters, TopicPairParams.class);
+ params = Util.translate(getFullName(), parameters, BidirectionalTopicParams.class);
ValidationResult result = params.validate(getFullName());
if (!result.isValid()) {
throw new ParameterValidationRuntimeException("invalid parameters", result);
}
- topicPair = pairManager.getTopicPair(params.getSource(), params.getTarget());
- forwarder = topicPair.addForwarder(selectorKeys);
+ topicHandler = topicManager.getTopicHandler(params.getSinkTopic(), params.getSourceTopic());
+ forwarder = topicHandler.addForwarder(selectorKeys);
}
/**
@@ -112,19 +112,21 @@ public abstract class TopicPairOperator extends OperatorPartial {
* @param <S> response type
* @param actorName actor name
* @param operation operation name
- * @param pairManager manager from which to get the topic pair
+ * @param topicManager manager from which to get the topic handler
* @param operationMaker function to make an operation
* @param keys keys used to extract the fields used to select responses for this
* operator
* @return a new operator
*/
// @formatter:off
- public static <Q,S> TopicPairOperator makeOperator(String actorName, String operation, TopicPairManager pairManager,
- BiFunction<ControlLoopOperationParams, TopicPairOperator, TopicPairOperation<Q,S>> operationMaker,
+ public static <Q, S> BidirectionalTopicOperator makeOperator(String actorName, String operation,
+ BidirectionalTopicManager topicManager,
+ BiFunction<ControlLoopOperationParams, BidirectionalTopicOperator,
+ BidirectionalTopicOperation<Q, S>> operationMaker,
SelectorKey... keys) {
// @formatter:off
- return makeOperator(actorName, operation, pairManager, Arrays.asList(keys), operationMaker);
+ return makeOperator(actorName, operation, topicManager, Arrays.asList(keys), operationMaker);
}
/**
@@ -134,19 +136,21 @@ public abstract class TopicPairOperator extends OperatorPartial {
* @param <S> response type
* @param actorName actor name
* @param operation operation name
- * @param pairManager manager from which to get the topic pair
+ * @param topicManager manager from which to get the topic handler
* @param keys keys used to extract the fields used to select responses for
* this operator
* @param operationMaker function to make an operation
* @return a new operator
*/
// @formatter:off
- public static <Q,S> TopicPairOperator makeOperator(String actorName, String operation, TopicPairManager pairManager,
+ public static <Q,S> BidirectionalTopicOperator makeOperator(String actorName, String operation,
+ BidirectionalTopicManager topicManager,
List<SelectorKey> keys,
- BiFunction<ControlLoopOperationParams, TopicPairOperator, TopicPairOperation<Q,S>> operationMaker) {
+ BiFunction<ControlLoopOperationParams, BidirectionalTopicOperator,
+ BidirectionalTopicOperation<Q,S>> operationMaker) {
// @formatter:on
- return new TopicPairOperator(actorName, operation, pairManager, keys) {
+ return new BidirectionalTopicOperator(actorName, operation, topicManager, keys) {
@Override
public synchronized Operation buildOperation(ControlLoopOperationParams params) {
return operationMaker.apply(params, this);
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java
index ba75f0be6..f1829d79a 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java
@@ -33,9 +33,7 @@ import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.http.client.HttpClient;
import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
-import org.onap.policy.common.utils.coder.Coder;
import org.onap.policy.common.utils.coder.CoderException;
-import org.onap.policy.common.utils.coder.StandardCoder;
import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams;
@@ -52,7 +50,6 @@ import org.slf4j.LoggerFactory;
@Getter
public abstract class HttpOperation<T> extends OperationPartial {
private static final Logger logger = LoggerFactory.getLogger(HttpOperation.class);
- private static final Coder coder = new StandardCoder();
/**
* Operator that created this operation.
@@ -171,7 +168,7 @@ public abstract class HttpOperation<T> extends OperationPartial {
String strResponse = HttpClient.getBody(rawResponse, String.class);
- logRestResponse(url, strResponse);
+ logMessage(EventType.IN, CommInfrastructure.REST, url, strResponse);
T response;
if (responseClass == String.class) {
@@ -224,63 +221,10 @@ public abstract class HttpOperation<T> extends OperationPartial {
return (rawResponse.getStatus() == 200);
}
- /**
- * Logs a REST request. If the request is not of type, String, then it attempts to
- * pretty-print it into JSON before logging.
- *
- * @param url request URL
- * @param request request to be logged
- */
- public <Q> void logRestRequest(String url, Q request) {
- String json;
- try {
- if (request == null) {
- json = null;
- } else if (request instanceof String) {
- json = request.toString();
- } else {
- json = makeCoder().encode(request, true);
- }
-
- } catch (CoderException e) {
- logger.warn("cannot pretty-print request", e);
- json = request.toString();
- }
-
- NetLoggerUtil.log(EventType.OUT, CommInfrastructure.REST, url, json);
- logger.info("[OUT|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json);
- }
-
- /**
- * Logs a REST response. If the response is not of type, String, then it attempts to
- * pretty-print it into JSON before logging.
- *
- * @param url request URL
- * @param response response to be logged
- */
- public <S> void logRestResponse(String url, S response) {
- String json;
- try {
- if (response == null) {
- json = null;
- } else if (response instanceof String) {
- json = response.toString();
- } else {
- json = makeCoder().encode(response, true);
- }
-
- } catch (CoderException e) {
- logger.warn("cannot pretty-print response", e);
- json = response.toString();
- }
-
- NetLoggerUtil.log(EventType.IN, CommInfrastructure.REST, url, json);
- logger.info("[IN|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json);
- }
-
- // these may be overridden by junit tests
-
- protected Coder makeCoder() {
- return coder;
+ @Override
+ public <Q> String logMessage(EventType direction, CommInfrastructure infra, String sink, Q request) {
+ String json = super.logMessage(direction, infra, sink, request);
+ NetLoggerUtil.log(direction, infra, sink, json);
+ return json;
}
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java
index d00b88bb5..0b3497197 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java
@@ -20,7 +20,12 @@
package org.onap.policy.controlloop.actorserviceprovider.impl;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
@@ -28,6 +33,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
import org.onap.policy.controlloop.ControlLoopOperation;
import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
import org.onap.policy.controlloop.actorserviceprovider.Operation;
@@ -53,8 +66,8 @@ import org.slf4j.LoggerFactory;
* be done to cancel that particular operation.
*/
public abstract class OperationPartial implements Operation {
-
private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
+ private static final Coder coder = new StandardCoder();
public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
// values extracted from the operator
@@ -470,103 +483,110 @@ public abstract class OperationPartial implements Operation {
* Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
* any outstanding futures when one completes.
*
- * @param futures futures for which to wait
- * @return a future to cancel or await an outcome. If this future is canceled, then
- * all of the futures will be canceled
+ * @param futureMakers function to make a future. If the function returns
+ * {@code null}, then no future is created for that function. On the other
+ * hand, if the function throws an exception, then the previously created
+ * functions are canceled and the exception is re-thrown
+ * @return a future to cancel or await an outcome, or {@code null} if no futures were
+ * created. If this future is canceled, then all of the futures will be
+ * canceled
*/
- protected CompletableFuture<OperationOutcome> anyOf(List<CompletableFuture<OperationOutcome>> futures) {
-
- // convert list to an array
- @SuppressWarnings("rawtypes")
- CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
+ protected CompletableFuture<OperationOutcome> anyOf(
+ @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
- @SuppressWarnings("unchecked")
- CompletableFuture<OperationOutcome> result = anyOf(arrFutures);
- return result;
+ return anyOf(Arrays.asList(futureMakers));
}
/**
- * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any
- * outstanding futures when one completes.
+ * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
+ * any outstanding futures when one completes.
*
- * @param futures futures for which to wait
- * @return a future to cancel or await an outcome. If this future is canceled, then
- * all of the futures will be canceled
+ * @param futureMakers function to make a future. If the function returns
+ * {@code null}, then no future is created for that function. On the other
+ * hand, if the function throws an exception, then the previously created
+ * functions are canceled and the exception is re-thrown
+ * @return a future to cancel or await an outcome, or {@code null} if no futures were
+ * created. If this future is canceled, then all of the futures will be
+ * canceled. Similarly, when this future completes, any incomplete futures
+ * will be canceled
*/
protected CompletableFuture<OperationOutcome> anyOf(
- @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+ List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
+
+ PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+
+ CompletableFuture<OperationOutcome>[] futures =
+ attachFutures(controller, futureMakers, UnaryOperator.identity());
+
+ if (futures.length == 0) {
+ // no futures were started
+ return null;
+ }
if (futures.length == 1) {
return futures[0];
}
- final Executor executor = params.getExecutor();
- final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
- attachFutures(controller, futures);
-
- // @formatter:off
- CompletableFuture.anyOf(futures)
- .thenApply(object -> (OperationOutcome) object)
- .whenCompleteAsync(controller.delayedComplete(), executor);
- // @formatter:on
+ CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
+ .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
return controller;
}
/**
- * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels
- * the futures if returned future is canceled. The future returns the "worst" outcome,
- * based on priority (see {@link #detmPriority(OperationOutcome)}).
+ * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
*
- * @param futures futures for which to wait
- * @return a future to cancel or await an outcome. If this future is canceled, then
- * all of the futures will be canceled
+ * @param futureMakers function to make a future. If the function returns
+ * {@code null}, then no future is created for that function. On the other
+ * hand, if the function throws an exception, then the previously created
+ * functions are canceled and the exception is re-thrown
+ * @return a future to cancel or await an outcome, or {@code null} if no futures were
+ * created. If this future is canceled, then all of the futures will be
+ * canceled
*/
- protected CompletableFuture<OperationOutcome> allOf(List<CompletableFuture<OperationOutcome>> futures) {
-
- // convert list to an array
- @SuppressWarnings("rawtypes")
- CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
+ protected CompletableFuture<OperationOutcome> allOf(
+ @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
- @SuppressWarnings("unchecked")
- CompletableFuture<OperationOutcome> result = allOf(arrFutures);
- return result;
+ return allOf(Arrays.asList(futureMakers));
}
/**
- * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the
- * futures if returned future is canceled. The future returns the "worst" outcome,
- * based on priority (see {@link #detmPriority(OperationOutcome)}).
+ * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
*
- * @param futures futures for which to wait
- * @return a future to cancel or await an outcome. If this future is canceled, then
- * all of the futures will be canceled
+ * @param futureMakers function to make a future. If the function returns
+ * {@code null}, then no future is created for that function. On the other
+ * hand, if the function throws an exception, then the previously created
+ * functions are canceled and the exception is re-thrown
+ * @return a future to cancel or await an outcome, or {@code null} if no futures were
+ * created. If this future is canceled, then all of the futures will be
+ * canceled. Similarly, when this future completes, any incomplete futures
+ * will be canceled
*/
protected CompletableFuture<OperationOutcome> allOf(
- @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
-
- if (futures.length == 1) {
- return futures[0];
- }
+ List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
+ PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
- final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
- attachFutures(controller, futures);
+ Queue<OperationOutcome> outcomes = new LinkedList<>();
- OperationOutcome[] outcomes = new OperationOutcome[futures.length];
+ CompletableFuture<OperationOutcome>[] futures =
+ attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
+ synchronized (outcomes) {
+ outcomes.add(outcome);
+ }
+ return outcome;
+ }));
- @SuppressWarnings("rawtypes")
- CompletableFuture[] futures2 = new CompletableFuture[futures.length];
+ if (futures.length == 0) {
+ // no futures were started
+ return null;
+ }
- // record the outcomes of each future when it completes
- for (int count = 0; count < futures2.length; ++count) {
- final int count2 = count;
- futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2);
+ if (futures.length == 1) {
+ return futures[0];
}
// @formatter:off
- CompletableFuture.allOf(futures2)
+ CompletableFuture.allOf(futures)
.thenApply(unused -> combineOutcomes(outcomes))
.whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
// @formatter:on
@@ -575,22 +595,62 @@ public abstract class OperationPartial implements Operation {
}
/**
- * Attaches the given futures to the controller.
+ * Invokes the functions to create the futures and attaches them to the controller.
*
* @param controller master controller for all of the futures
- * @param futures futures to be attached to the controller
- */
- private void attachFutures(PipelineControllerFuture<OperationOutcome> controller,
- @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+ * @param futureMakers futures to be attached to the controller
+ * @param adorn function that "adorns" the future, possible adding onto its pipeline.
+ * Returns the adorned future
+ * @return an array of futures, possibly zero-length. If the array is of size one,
+ * then that one item should be returned instead of the controller
+ */
+ private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
+ List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
+ UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
+
+ if (futureMakers.isEmpty()) {
+ @SuppressWarnings("unchecked")
+ CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
+ return result;
+ }
- if (futures.length == 0) {
- throw new IllegalArgumentException("empty list of futures");
+ // the last, unadorned future that is created
+ CompletableFuture<OperationOutcome> lastFuture = null;
+
+ List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
+
+ // make each future
+ for (var maker : futureMakers) {
+ try {
+ CompletableFuture<OperationOutcome> future = maker.get();
+ if (future == null) {
+ continue;
+ }
+
+ // propagate "stop" to the future
+ controller.add(future);
+
+ futures.add(adorn.apply(future));
+
+ lastFuture = future;
+
+ } catch (RuntimeException e) {
+ logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
+ controller.cancel(false);
+ throw e;
+ }
}
- // attach each task
- for (CompletableFuture<OperationOutcome> future : futures) {
- controller.add(future);
+ @SuppressWarnings("unchecked")
+ CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
+
+ if (result.length == 1) {
+ // special case - return the unadorned future
+ result[0] = lastFuture;
+ return result;
}
+
+ return futures.toArray(result);
}
/**
@@ -599,15 +659,13 @@ public abstract class OperationPartial implements Operation {
* @param outcomes outcomes to be examined
* @return the combined outcome
*/
- private OperationOutcome combineOutcomes(OperationOutcome[] outcomes) {
+ private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
// identify the outcome with the highest priority
- OperationOutcome outcome = outcomes[0];
+ OperationOutcome outcome = outcomes.remove();
int priority = detmPriority(outcome);
- // start with "1", as we've already dealt with "0"
- for (int count = 1; count < outcomes.length; ++count) {
- OperationOutcome outcome2 = outcomes[count];
+ for (OperationOutcome outcome2 : outcomes) {
int priority2 = detmPriority(outcome2);
if (priority2 > priority) {
@@ -656,72 +714,114 @@ public abstract class OperationPartial implements Operation {
}
/**
- * Performs a task, after verifying that the controller is still running. Also checks
- * that the previous outcome was successful, if specified.
+ * Performs a sequence of tasks, stopping if a task fails. A given task's future is
+ * not created until the previous task completes. The pipeline returns the outcome of
+ * the last task executed.
*
- * @param controller overall pipeline controller
- * @param checkSuccess {@code true} to check the previous outcome, {@code false}
- * otherwise
- * @param outcome outcome of the previous task
- * @param task task to be performed
- * @return the task, if everything checks out. Otherwise, it returns an incomplete
- * future and completes the controller instead
+ * @param futureMakers functions to make the futures
+ * @return a future to cancel the sequence or await the outcome
*/
- // @formatter:off
- protected CompletableFuture<OperationOutcome> doTask(
- PipelineControllerFuture<OperationOutcome> controller,
- boolean checkSuccess, OperationOutcome outcome,
- CompletableFuture<OperationOutcome> task) {
- // @formatter:on
+ protected CompletableFuture<OperationOutcome> sequence(
+ @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
- if (checkSuccess && !isSuccess(outcome)) {
- /*
- * must complete before canceling so that cancel() doesn't cause controller to
- * complete
- */
- controller.complete(outcome);
- task.cancel(false);
- return new CompletableFuture<>();
+ return sequence(Arrays.asList(futureMakers));
+ }
+
+ /**
+ * Performs a sequence of tasks, stopping if a task fails. A given task's future is
+ * not created until the previous task completes. The pipeline returns the outcome of
+ * the last task executed.
+ *
+ * @param futureMakers functions to make the futures
+ * @return a future to cancel the sequence or await the outcome, or {@code null} if
+ * there were no tasks to perform
+ */
+ protected CompletableFuture<OperationOutcome> sequence(
+ List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
+
+ Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
+
+ CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
+ if (nextTask == null) {
+ // no tasks
+ return null;
+ }
+
+ if (queue.isEmpty()) {
+ // only one task - just return it rather than wrapping it in a controller
+ return nextTask;
}
- return controller.wrap(task);
+ /*
+ * multiple tasks - need a controller to stop whichever task is currently
+ * executing
+ */
+ final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+ final Executor executor = params.getExecutor();
+
+ // @formatter:off
+ controller.wrap(nextTask)
+ .thenComposeAsync(nextTaskOnSuccess(controller, queue), executor)
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+ // @formatter:on
+
+ return controller;
}
/**
- * Performs a task, after verifying that the controller is still running. Also checks
- * that the previous outcome was successful, if specified.
+ * Executes the next task in the queue, if the previous outcome was successful.
*
- * @param controller overall pipeline controller
- * @param checkSuccess {@code true} to check the previous outcome, {@code false}
- * otherwise
- * @param task function to start the task to be performed
- * @return a function to perform the task. If everything checks out, then it returns
- * the task. Otherwise, it returns an incomplete future and completes the
- * controller instead
+ * @param controller pipeline controller
+ * @param taskQueue queue of tasks to be performed
+ * @return a future to execute the remaining tasks, or the current outcome, if it's a
+ * failure, or if there are no more tasks
*/
- // @formatter:off
- protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask(
+ private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
PipelineControllerFuture<OperationOutcome> controller,
- boolean checkSuccess,
- Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) {
- // @formatter:on
+ Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
return outcome -> {
-
- if (!controller.isRunning()) {
- return new CompletableFuture<>();
+ if (!isSuccess(outcome)) {
+ // return the failure
+ return CompletableFuture.completedFuture(outcome);
}
- if (checkSuccess && !isSuccess(outcome)) {
- controller.complete(outcome);
- return new CompletableFuture<>();
+ CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
+ if (nextTask == null) {
+ // no tasks - just return the success
+ return CompletableFuture.completedFuture(outcome);
}
- return controller.wrap(task.apply(outcome));
+ // @formatter:off
+ return controller
+ .wrap(nextTask)
+ .thenComposeAsync(nextTaskOnSuccess(controller, taskQueue), params.getExecutor());
+ // @formatter:on
};
}
/**
+ * Gets the next task from the queue, skipping those that are {@code null}.
+ *
+ * @param taskQueue task queue
+ * @return the next task, or {@code null} if the queue is now empty
+ */
+ private CompletableFuture<OperationOutcome> getNextTask(
+ Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
+
+ Supplier<CompletableFuture<OperationOutcome>> maker;
+
+ while ((maker = taskQueue.poll()) != null) {
+ CompletableFuture<OperationOutcome> future = maker.get();
+ if (future != null) {
+ return future;
+ }
+ }
+
+ return null;
+ }
+
+ /**
* Sets the start time of the operation and invokes the callback to indicate that the
* operation has started. Does nothing if the pipeline has been stopped.
* <p/>
@@ -809,6 +909,38 @@ public abstract class OperationPartial implements Operation {
return (thrown instanceof TimeoutException);
}
+ /**
+ * Logs a response. If the response is not of type, String, then it attempts to
+ * pretty-print it into JSON before logging.
+ *
+ * @param direction IN or OUT
+ * @param infra communication infrastructure on which it was published
+ * @param source source name (e.g., the URL or Topic name)
+ * @param response response to be logged
+ * @return the JSON text that was logged
+ */
+ public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T response) {
+ String json;
+ try {
+ if (response == null) {
+ json = null;
+ } else if (response instanceof String) {
+ json = response.toString();
+ } else {
+ json = makeCoder().encode(response, true);
+ }
+
+ } catch (CoderException e) {
+ String type = (direction == EventType.IN ? "response" : "request");
+ logger.warn("cannot pretty-print {}", type, e);
+ json = response.toString();
+ }
+
+ logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
+
+ return json;
+ }
+
// these may be overridden by subclasses or junit tests
/**
@@ -841,4 +973,10 @@ public abstract class OperationPartial implements Operation {
protected long getTimeoutMs(Integer timeoutSec) {
return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
}
+
+ // these may be overridden by junit tests
+
+ protected Coder makeCoder() {
+ return coder;
+ }
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java
deleted file mode 100644
index c3e1e5c4d..000000000
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.controlloop.actorserviceprovider.impl;
-
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
-import org.apache.commons.lang3.tuple.Pair;
-import org.onap.policy.common.parameters.ValidationResult;
-import org.onap.policy.controlloop.actorserviceprovider.Util;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairActorParams;
-import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
-import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager;
-
-/**
- * Actor that uses a topic pair. The actor's parameters must be a
- * {@link TopicPairActorParams}.
- */
-public class TopicPairActor extends ActorImpl implements TopicPairManager {
-
- /**
- * Maps a topic source and target name to its topic pair.
- */
- private final Map<Pair<String, String>, TopicPair> params2topic = new ConcurrentHashMap<>();
-
-
- /**
- * Constructs the object.
- *
- * @param name actor's name
- */
- public TopicPairActor(String name) {
- super(name);
- }
-
- @Override
- protected void doStart() {
- params2topic.values().forEach(TopicPair::start);
- super.doStart();
- }
-
- @Override
- protected void doStop() {
- params2topic.values().forEach(TopicPair::stop);
- super.doStop();
- }
-
- @Override
- protected void doShutdown() {
- params2topic.values().forEach(TopicPair::shutdown);
- params2topic.clear();
- super.doShutdown();
- }
-
- @Override
- public TopicPair getTopicPair(String source, String target) {
- Pair<String, String> key = Pair.of(source, target);
- return params2topic.computeIfAbsent(key, pair -> new TopicPair(source, target));
- }
-
- /**
- * Translates the parameters to a {@link TopicPairActorParams} and then creates a
- * function that will extract operator-specific parameters.
- */
- @Override
- protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) {
- String actorName = getName();
-
- TopicPairActorParams params = Util.translate(actorName, actorParameters, TopicPairActorParams.class);
- ValidationResult result = params.validate(getName());
- if (!result.isValid()) {
- throw new ParameterValidationRuntimeException("invalid parameters", result);
- }
-
- // create a map of the default parameters
- Map<String, Object> defaultParams = Util.translateToMap(getName(), params.getDefaults());
- Map<String, Map<String, Object>> operations = params.getOperation();
-
- return operationName -> {
- Map<String, Object> specificParams = operations.get(operationName);
- if (specificParams == null) {
- return null;
- }
-
- // start with a copy of defaults and overlay with specific
- Map<String, Object> subparams = new TreeMap<>(defaultParams);
- subparams.putAll(specificParams);
-
- return Util.translateToMap(getName() + "." + operationName, subparams);
- };
- }
-}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParams.java
new file mode 100644
index 000000000..291aeeb23
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParams.java
@@ -0,0 +1,57 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.common.parameters.annotations.Min;
+
+/**
+ * Parameters used by Actors whose Operators use bidirectional topic.
+ */
+@Getter
+@Setter
+@EqualsAndHashCode(callSuper = true)
+public class BidirectionalTopicActorParams extends CommonActorParams {
+
+ /*
+ * Optional, default values that are used if missing from the operation-specific
+ * parameters.
+ */
+
+ /**
+ * Sink topic name to which requests should be published.
+ */
+ private String sinkTopic;
+
+ /**
+ * Source topic name, from which to read responses.
+ */
+ private String sourceTopic;
+
+ /**
+ * Amount of time, in seconds, to wait for the HTTP request to complete. The default
+ * is 90 seconds.
+ */
+ @Min(1)
+ private int timeoutSec = 90;
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicParams.java
index 33fcf3052..cafca1fa6 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParams.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicParams.java
@@ -29,31 +29,33 @@ import org.onap.policy.common.parameters.annotations.NotBlank;
import org.onap.policy.common.parameters.annotations.NotNull;
/**
- * Parameters used by Operators that use a pair of Topics, one to publish requests and the
- * other to receive responses.
+ * Parameters used by Operators that use a bidirectional topic.
*/
@NotNull
@NotBlank
@Data
@Builder(toBuilder = true)
-public class TopicPairParams {
+public class BidirectionalTopicParams {
/**
- * Source topic end point, from which to read responses.
+ * Sink topic name to which requests should be published.
*/
- private String source;
+ private String sinkTopic;
/**
- * Name of the target topic end point to which requests should be published.
+ * Source topic name, from which to read responses.
*/
- private String target;
+ private String sourceTopic;
/**
- * Amount of time, in seconds to wait for the response. The default is five minutes.
+ * Amount of time, in seconds to wait for the response.
+ * <p/>
+ * Note: this should NOT have a default value, as it receives its default value from
+ * {@link BidirectionalTopicActorParams}.
*/
@Min(1)
- @Builder.Default
- private int timeoutSec = 300;
+ private int timeoutSec;
+
/**
* Validates the parameters.
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/CommonActorParams.java
index 42a44ee9c..dc6f2b657 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/CommonActorParams.java
@@ -21,37 +21,57 @@
package org.onap.policy.controlloop.actorserviceprovider.parameters;
import java.util.Map;
-import lombok.Builder;
-import lombok.Data;
-import org.onap.policy.common.parameters.BeanValidationResult;
+import java.util.TreeMap;
+import java.util.function.Function;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
import org.onap.policy.common.parameters.BeanValidator;
import org.onap.policy.common.parameters.ValidationResult;
-import org.onap.policy.common.parameters.annotations.NotBlank;
import org.onap.policy.common.parameters.annotations.NotNull;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
/**
- * Parameters used by Actors whose Operators use a pair of Topics, one to publish requests
- * and the other to receive responses.
+ * Superclass for Actor parameters that have default values in "this" object, and
+ * operation-specific values in {@link #operation}.
*/
-@NotNull
-@NotBlank
-@Data
-@Builder
-public class TopicPairActorParams {
+@Getter
+@Setter
+@EqualsAndHashCode
+public class CommonActorParams {
/**
- * This contains the default parameters that are used when an operation doesn't
- * specify them. Note: each operation to be used must still have an entry in
- * {@link #operation}, even if it's empty. Otherwise, the given operation will not be
- * started.
+ * Maps the operation name to its parameters.
*/
- private TopicPairParams defaults;
+ @NotNull
+ protected Map<String, Map<String, Object>> operation;
+
/**
- * Maps an operation name to its individual parameters.
+ * Extracts a specific operation's parameters from "this".
+ *
+ * @param name name of the item containing "this"
+ * @return a function to extract an operation's parameters from "this". Note: the
+ * returned function is not thread-safe
*/
- private Map<String, Map<String, Object>> operation;
+ public Function<String, Map<String, Object>> makeOperationParameters(String name) {
+
+ Map<String, Object> defaultParams = Util.translateToMap(name, this);
+ defaultParams.remove("operation");
+
+ return operationName -> {
+ Map<String, Object> specificParams = operation.get(operationName);
+ if (specificParams == null) {
+ return null;
+ }
+
+ // start with a copy of defaults and overlay with specific
+ Map<String, Object> subparams = new TreeMap<>(defaultParams);
+ subparams.putAll(specificParams);
+ return Util.translateToMap(name + "." + operationName, subparams);
+ };
+ }
/**
* Validates the parameters.
@@ -60,7 +80,7 @@ public class TopicPairActorParams {
* @return "this"
* @throws IllegalArgumentException if the parameters are invalid
*/
- public TopicPairActorParams doValidation(String name) {
+ public CommonActorParams doValidation(String name) {
ValidationResult result = validate(name);
if (!result.isValid()) {
throw new ParameterValidationRuntimeException("invalid parameters", result);
@@ -77,17 +97,6 @@ public class TopicPairActorParams {
* @return the validation result
*/
public ValidationResult validate(String resultName) {
- BeanValidationResult result = new BeanValidator().validateTop(resultName, this);
-
- if (defaults != null) {
- result.addResult(defaults.validate("defaults"));
- }
-
- // @formatter:off
- result.validateMap("operation", operation,
- (result2, entry) -> result2.validateNotNull(entry.getKey(), entry.getValue()));
- // @formatter:on
-
- return result;
+ return new BeanValidator().validateTop(resultName, this);
}
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java
index 275c8bc4e..d589e1d7e 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java
@@ -20,26 +20,23 @@
package org.onap.policy.controlloop.actorserviceprovider.parameters;
-import java.util.Map;
-import java.util.function.Function;
-import lombok.Data;
-import org.onap.policy.common.parameters.BeanValidationResult;
-import org.onap.policy.common.parameters.BeanValidator;
-import org.onap.policy.common.parameters.ValidationResult;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
import org.onap.policy.common.parameters.annotations.Min;
-import org.onap.policy.common.parameters.annotations.NotBlank;
-import org.onap.policy.common.parameters.annotations.NotNull;
-import org.onap.policy.controlloop.actorserviceprovider.Util;
/**
- * Parameters used by Actors that connect to a server via HTTP. This contains the
- * parameters that are common to all of the operations. Only the path changes for each
- * operation, thus it includes a mapping from operation name to path.
+ * Parameters used by Actors that connect to a server via HTTP.
*/
-@Data
-@NotNull
-@NotBlank
-public class HttpActorParams {
+@Getter
+@Setter
+@EqualsAndHashCode(callSuper = true)
+public class HttpActorParams extends CommonActorParams {
+
+ /*
+ * Optional, default values that are used if missing from the operation-specific
+ * parameters.
+ */
/**
* Name of the HttpClient, as found in the HttpClientFactory.
@@ -47,66 +44,9 @@ public class HttpActorParams {
private String clientName;
/**
- * Amount of time, in seconds to wait for the HTTP request to complete, where zero
- * indicates that it should wait forever. The default is zero.
- */
- @Min(0)
- private int timeoutSec = 0;
-
- /**
- * Maps the operation name to its URI path.
- */
- private Map<String, String> path;
-
- /**
- * Extracts a specific operation's parameters from "this".
- *
- * @param name name of the item containing "this"
- * @return a function to extract an operation's parameters from "this". Note: the
- * returned function is not thread-safe
- */
- public Function<String, Map<String, Object>> makeOperationParameters(String name) {
- HttpParams subparams = HttpParams.builder().clientName(getClientName()).timeoutSec(getTimeoutSec()).build();
-
- return operation -> {
- String subpath = path.get(operation);
- if (subpath == null) {
- return null;
- }
-
- subparams.setPath(subpath);
- return Util.translateToMap(name + "." + operation, subparams);
- };
- }
-
- /**
- * Validates the parameters.
- *
- * @param name name of the object containing these parameters
- * @return "this"
- * @throws IllegalArgumentException if the parameters are invalid
+ * Amount of time, in seconds, to wait for the HTTP request to complete. The default
+ * is 90 seconds.
*/
- public HttpActorParams doValidation(String name) {
- ValidationResult result = validate(name);
- if (!result.isValid()) {
- throw new ParameterValidationRuntimeException("invalid parameters", result);
- }
-
- return this;
- }
-
- /**
- * Validates the parameters.
- *
- * @param resultName name of the result
- *
- * @return the validation result
- */
- public ValidationResult validate(String resultName) {
- BeanValidationResult result = new BeanValidator().validateTop(resultName, this);
-
- result.validateMap("path", path, (result2, entry) -> result2.validateNotNull(entry.getKey(), entry.getValue()));
-
- return result;
- }
+ @Min(1)
+ private int timeoutSec = 90;
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java
index 93711c032..2d3ab8b54 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java
@@ -48,12 +48,13 @@ public class HttpParams {
private String path;
/**
- * Amount of time, in seconds to wait for the HTTP request to complete, where zero
- * indicates that it should wait forever. The default is zero.
+ * Amount of time, in seconds, to wait for the HTTP request to complete.
+ * <p/>
+ * Note: this should NOT have a default value, as it receives its default value from
+ * {@link HttpActorParams}.
*/
- @Min(0)
- @Builder.Default
- private int timeoutSec = 0;
+ @Min(1)
+ private int timeoutSec;
/**
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicHandler.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicHandler.java
new file mode 100644
index 000000000..30ee1e2d0
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicHandler.java
@@ -0,0 +1,79 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.topic;
+
+import java.util.List;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException;
+
+/**
+ * Handler for a bidirectional topic, supporting both publishing and forwarding of
+ * incoming messages.
+ */
+public class BidirectionalTopicHandler extends BidirectionalTopicClient {
+
+ /**
+ * Listener that will be attached to the topic to receive responses.
+ */
+ private final TopicListenerImpl listener = new TopicListenerImpl();
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param sinkTopic sink topic name
+ * @param sourceTopic source topic name
+ * @throws BidirectionalTopicClientException if an error occurs
+ */
+ public BidirectionalTopicHandler(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException {
+ super(sinkTopic, sourceTopic);
+ }
+
+ /**
+ * Starts listening on the source topic(s).
+ */
+ public void start() {
+ getSource().register(listener);
+ }
+
+ /**
+ * Stops listening on the source topic(s).
+ */
+ public void stop() {
+ getSource().unregister(listener);
+ }
+
+ /**
+ * Stops listening on the source topic(s) and clears all of the forwarders.
+ */
+ public void shutdown() {
+ stop();
+ listener.shutdown();
+ }
+
+ public Forwarder addForwarder(SelectorKey... keys) {
+ return listener.addForwarder(keys);
+ }
+
+ public Forwarder addForwarder(List<SelectorKey> keys) {
+ return listener.addForwarder(keys);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicManager.java
index c351f95f6..10411875a 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicManager.java
@@ -21,17 +21,17 @@
package org.onap.policy.controlloop.actorserviceprovider.topic;
/**
- * Manages topic pairs.
+ * Manages bidirectional topics.
*/
@FunctionalInterface
-public interface TopicPairManager {
+public interface BidirectionalTopicManager {
/**
- * Gets the topic pair for the given parameters, creating one if it does not exist.
+ * Gets the topic handler for the given parameters, creating one if it does not exist.
*
- * @param source source topic name
- * @param target target topic name
- * @return the topic pair associated with the given source and target topics
+ * @param sinkTopic sink topic name
+ * @param sourceTopic source topic name
+ * @return the topic handler associated with the given sink and source topic names
*/
- TopicPair getTopicPair(String source, String target);
+ BidirectionalTopicHandler getTopicHandler(String sinkTopic, String sourceTopic);
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java
index 8e9109c9e..2d98b66fc 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java
@@ -24,8 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer;
+import java.util.function.BiConsumer;
import org.onap.policy.common.utils.coder.StandardCoderObject;
import org.onap.policy.controlloop.actorserviceprovider.Util;
import org.slf4j.Logger;
@@ -43,7 +42,7 @@ public class Forwarder {
* Maps a set of field values to one or more listeners.
*/
// @formatter:off
- private final Map<List<String>, Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String>>
+ private final Map<List<String>, Map<BiConsumer<String, StandardCoderObject>, String>>
values2listeners = new ConcurrentHashMap<>();
// @formatter:on
@@ -68,13 +67,13 @@ public class Forwarder {
* @param values field values of interest, in one-to-one correspondence with the keys
* @param listener listener to register
*/
- public void register(List<String> values, TriConsumer<CommInfrastructure, String, StandardCoderObject> listener) {
+ public void register(List<String> values, BiConsumer<String, StandardCoderObject> listener) {
if (keys.size() != values.size()) {
throw new IllegalArgumentException("key/value mismatch");
}
values2listeners.compute(values, (key, listeners) -> {
- Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String> map = listeners;
+ Map<BiConsumer<String, StandardCoderObject>, String> map = listeners;
if (map == null) {
map = new ConcurrentHashMap<>();
}
@@ -90,7 +89,7 @@ public class Forwarder {
* @param values field values of interest, in one-to-one correspondence with the keys
* @param listener listener to unregister
*/
- public void unregister(List<String> values, TriConsumer<CommInfrastructure, String, StandardCoderObject> listener) {
+ public void unregister(List<String> values, BiConsumer<String, StandardCoderObject> listener) {
values2listeners.computeIfPresent(values, (key, listeners) -> {
listeners.remove(listener);
return (listeners.isEmpty() ? null : listeners);
@@ -100,11 +99,10 @@ public class Forwarder {
/**
* Processes a message, forwarding it to the appropriate listeners, if any.
*
- * @param infra communication infrastructure on which the response was received
* @param textMessage original text message that was received
* @param scoMessage decoded text message
*/
- public void onMessage(CommInfrastructure infra, String textMessage, StandardCoderObject scoMessage) {
+ public void onMessage(String textMessage, StandardCoderObject scoMessage) {
// extract the key values from the message
List<String> values = new ArrayList<>(keys.size());
for (SelectorKey key : keys) {
@@ -121,8 +119,7 @@ public class Forwarder {
}
// get the listeners for this set of values
- Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String> listeners =
- values2listeners.get(values);
+ Map<BiConsumer<String, StandardCoderObject>, String> listeners = values2listeners.get(values);
if (listeners == null) {
// no listeners for this particular list of values
return;
@@ -130,9 +127,9 @@ public class Forwarder {
// forward the message to each listener
- for (TriConsumer<CommInfrastructure, String, StandardCoderObject> listener : listeners.keySet()) {
+ for (BiConsumer<String, StandardCoderObject> listener : listeners.keySet()) {
try {
- listener.accept(infra, textMessage, scoMessage);
+ listener.accept(textMessage, scoMessage);
} catch (RuntimeException e) {
logger.warn("exception thrown by listener {}", Util.ident(listener), e);
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java
index eb805ca5d..fcb463518 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java
@@ -98,7 +98,7 @@ public class TopicListenerImpl implements TopicListener {
* them all take a crack at it.
*/
for (Forwarder forwarder : selector2forwarder.values()) {
- forwarder.onMessage(infra, message, object);
+ forwarder.onMessage(message, object);
}
}
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java
deleted file mode 100644
index c0cfe2571..000000000
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.controlloop.actorserviceprovider.topic;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import lombok.Getter;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A pair of topics, one of which is used to publish requests and the other to receive
- * responses.
- */
-public class TopicPair extends TopicListenerImpl {
- private static final Logger logger = LoggerFactory.getLogger(TopicPair.class);
-
- @Getter
- private final String source;
-
- @Getter
- private final String target;
-
- private final List<TopicSink> publishers;
- private final List<TopicSource> subscribers;
-
- /**
- * Constructs the object.
- *
- * @param source source topic name
- * @param target target topic name
- */
- public TopicPair(String source, String target) {
- this.source = source;
- this.target = target;
-
- publishers = getTopicEndpointManager().getTopicSinks(target);
- if (publishers.isEmpty()) {
- throw new IllegalArgumentException("no sinks for topic: " + target);
- }
-
- subscribers = getTopicEndpointManager().getTopicSources(Arrays.asList(source));
- if (subscribers.isEmpty()) {
- throw new IllegalArgumentException("no sources for topic: " + source);
- }
- }
-
- /**
- * Starts listening on the source topic(s).
- */
- public void start() {
- subscribers.forEach(topic -> topic.register(this));
- }
-
- /**
- * Stops listening on the source topic(s).
- */
- public void stop() {
- subscribers.forEach(topic -> topic.unregister(this));
- }
-
- /**
- * Stops listening on the source topic(s) and clears all of the forwarders.
- */
- @Override
- public void shutdown() {
- stop();
- super.shutdown();
- }
-
- /**
- * Publishes a message to the target topic.
- *
- * @param message message to be published
- * @return a list of the infrastructures on which it was published
- */
- public List<CommInfrastructure> publish(String message) {
- List<CommInfrastructure> infrastructures = new ArrayList<>(publishers.size());
-
- for (TopicSink topic : publishers) {
- try {
- topic.send(message);
- infrastructures.add(topic.getTopicCommInfrastructure());
-
- } catch (RuntimeException e) {
- logger.warn("cannot publish to {}:{}", topic.getTopicCommInfrastructure(), target, e);
- }
- }
-
- return infrastructures;
- }
-
- // these may be overridden by junit tests
-
- protected TopicEndpoint getTopicEndpointManager() {
- return TopicEndpointManager.getManager();
- }
-}