aboutsummaryrefslogtreecommitdiffstats
path: root/models-interactions/model-actors/actorServiceProvider/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'models-interactions/model-actors/actorServiceProvider/src/main')
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java6
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java112
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java316
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java156
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java93
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParams.java (renamed from models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java)22
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java141
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/SelectorKey.java57
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java104
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java122
-rw-r--r--models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java37
11 files changed, 1152 insertions, 14 deletions
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java
index c4bf5f484..ba75f0be6 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java
@@ -181,9 +181,9 @@ public abstract class HttpOperation<T> extends OperationPartial {
try {
response = makeCoder().decode(strResponse, responseClass);
} catch (CoderException e) {
- logger.warn("{}.{} cannot decode response with http error code {} for {}", params.getActor(),
- params.getOperation(), rawResponse.getStatus(), params.getRequestId(), e);
- return setOutcome(outcome, PolicyResult.FAILURE_EXCEPTION);
+ logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(),
+ params.getRequestId(), e);
+ throw new IllegalArgumentException("cannot decode response");
}
}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java
new file mode 100644
index 000000000..c3e1e5c4d
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java
@@ -0,0 +1,112 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairActorParams;
+import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
+import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager;
+
+/**
+ * Actor that uses a topic pair. The actor's parameters must be a
+ * {@link TopicPairActorParams}.
+ */
+public class TopicPairActor extends ActorImpl implements TopicPairManager {
+
+ /**
+ * Maps a topic source and target name to its topic pair.
+ */
+ private final Map<Pair<String, String>, TopicPair> params2topic = new ConcurrentHashMap<>();
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param name actor's name
+ */
+ public TopicPairActor(String name) {
+ super(name);
+ }
+
+ @Override
+ protected void doStart() {
+ params2topic.values().forEach(TopicPair::start);
+ super.doStart();
+ }
+
+ @Override
+ protected void doStop() {
+ params2topic.values().forEach(TopicPair::stop);
+ super.doStop();
+ }
+
+ @Override
+ protected void doShutdown() {
+ params2topic.values().forEach(TopicPair::shutdown);
+ params2topic.clear();
+ super.doShutdown();
+ }
+
+ @Override
+ public TopicPair getTopicPair(String source, String target) {
+ Pair<String, String> key = Pair.of(source, target);
+ return params2topic.computeIfAbsent(key, pair -> new TopicPair(source, target));
+ }
+
+ /**
+ * Translates the parameters to a {@link TopicPairActorParams} and then creates a
+ * function that will extract operator-specific parameters.
+ */
+ @Override
+ protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) {
+ String actorName = getName();
+
+ TopicPairActorParams params = Util.translate(actorName, actorParameters, TopicPairActorParams.class);
+ ValidationResult result = params.validate(getName());
+ if (!result.isValid()) {
+ throw new ParameterValidationRuntimeException("invalid parameters", result);
+ }
+
+ // create a map of the default parameters
+ Map<String, Object> defaultParams = Util.translateToMap(getName(), params.getDefaults());
+ Map<String, Map<String, Object>> operations = params.getOperation();
+
+ return operationName -> {
+ Map<String, Object> specificParams = operations.get(operationName);
+ if (specificParams == null) {
+ return null;
+ }
+
+ // start with a copy of defaults and overlay with specific
+ Map<String, Object> subparams = new TreeMap<>(defaultParams);
+ subparams.putAll(specificParams);
+
+ return Util.translateToMap(getName() + "." + operationName, subparams);
+ };
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java
new file mode 100644
index 000000000..6b584d7c6
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java
@@ -0,0 +1,316 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import org.apache.commons.lang3.tuple.Triple;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
+import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams;
+import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
+import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
+import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
+import org.onap.policy.controlloop.policy.PolicyResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Operation that uses a Topic pair.
+ *
+ * @param <S> response type
+ */
+@Getter
+public abstract class TopicPairOperation<Q, S> extends OperationPartial {
+ private static final Logger logger = LoggerFactory.getLogger(TopicPairOperation.class);
+ private static final Coder coder = new StandardCoder();
+
+ // fields extracted from the operator
+
+ private final TopicPair topicPair;
+ private final Forwarder forwarder;
+ private final TopicPairParams pairParams;
+ private final long timeoutMs;
+
+ /**
+ * Response class.
+ */
+ private final Class<S> responseClass;
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param params operation parameters
+ * @param operator operator that created this operation
+ * @param clazz response class
+ */
+ public TopicPairOperation(ControlLoopOperationParams params, TopicPairOperator operator, Class<S> clazz) {
+ super(params, operator);
+ this.topicPair = operator.getTopicPair();
+ this.forwarder = operator.getForwarder();
+ this.pairParams = operator.getParams();
+ this.responseClass = clazz;
+ this.timeoutMs = TimeUnit.MILLISECONDS.convert(pairParams.getTimeoutSec(), TimeUnit.SECONDS);
+ }
+
+ /**
+ * If no timeout is specified, then it returns the default timeout.
+ */
+ @Override
+ protected long getTimeoutMs(Integer timeoutSec) {
+ // TODO move this method to the superclass
+ return (timeoutSec == null || timeoutSec == 0 ? this.timeoutMs : super.getTimeoutMs(timeoutSec));
+ }
+
+ /**
+ * Publishes the request and arranges to receive the response.
+ */
+ @Override
+ protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
+
+ final Q request = makeRequest(attempt);
+ final List<String> expectedKeyValues = getExpectedKeyValues(attempt, request);
+
+ final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+ final CompletableFuture<Triple<CommInfrastructure, String, StandardCoderObject>> future =
+ new CompletableFuture<>();
+ final Executor executor = params.getExecutor();
+
+ // register a listener BEFORE publishing
+
+ // @formatter:off
+ TriConsumer<CommInfrastructure, String, StandardCoderObject> listener =
+ (infra, rawResponse, scoResponse) -> future.complete(Triple.of(infra, rawResponse, scoResponse));
+ // @formatter:on
+
+ // TODO this currently only allows a single matching response
+
+ forwarder.register(expectedKeyValues, listener);
+
+ // ensure listener is unregistered if the controller is canceled
+ controller.add(() -> forwarder.unregister(expectedKeyValues, listener));
+
+ // publish the request
+ try {
+ publishRequest(request);
+ } catch (RuntimeException e) {
+ logger.warn("{}: failed to publish request for {}", getFullName(), params.getRequestId());
+ forwarder.unregister(expectedKeyValues, listener);
+ throw e;
+ }
+
+
+ // once "future" completes, process the response, and then complete the controller
+
+ // @formatter:off
+ future.thenApplyAsync(
+ triple -> processResponse(triple.getLeft(), outcome, triple.getMiddle(), triple.getRight()),
+ executor)
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+ // @formatter:on
+
+ return controller;
+ }
+
+ /**
+ * Makes the request.
+ *
+ * @param attempt operation attempt
+ * @return a new request
+ */
+ protected abstract Q makeRequest(int attempt);
+
+ /**
+ * Gets values, expected in the response, that should match the selector keys.
+ *
+ * @param attempt operation attempt
+ * @param request request to be published
+ * @return a list of the values to be matched by the selector keys
+ */
+ protected abstract List<String> getExpectedKeyValues(int attempt, Q request);
+
+ /**
+ * Publishes the request. Encodes the request, if it is not already a String.
+ *
+ * @param request request to be published
+ */
+ protected void publishRequest(Q request) {
+ String json;
+ try {
+ if (request instanceof String) {
+ json = request.toString();
+ } else {
+ json = makeCoder().encode(request);
+ }
+ } catch (CoderException e) {
+ throw new IllegalArgumentException("cannot encode request", e);
+ }
+
+ List<CommInfrastructure> list = topicPair.publish(json);
+ if (list.isEmpty()) {
+ throw new IllegalStateException("nothing published");
+ }
+
+ logTopicRequest(list, request);
+ }
+
+ /**
+ * Processes a response.
+ *
+ * @param infra communication infrastructure on which the response was received
+ * @param outcome outcome to be populated
+ * @param response raw response to process
+ * @param scoResponse response, as a {@link StandardCoderObject}
+ * @return the outcome
+ */
+ protected OperationOutcome processResponse(CommInfrastructure infra, OperationOutcome outcome, String rawResponse,
+ StandardCoderObject scoResponse) {
+
+ logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId());
+
+ logTopicResponse(infra, rawResponse);
+
+ S response;
+ if (responseClass == String.class) {
+ response = responseClass.cast(rawResponse);
+
+ } else if (responseClass == StandardCoderObject.class) {
+ response = responseClass.cast(scoResponse);
+
+ } else {
+ try {
+ response = makeCoder().decode(rawResponse, responseClass);
+ } catch (CoderException e) {
+ logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(),
+ params.getRequestId());
+ throw new IllegalArgumentException("cannot decode response", e);
+ }
+ }
+
+ if (!isSuccess(rawResponse, response)) {
+ logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(),
+ params.getRequestId());
+ return setOutcome(outcome, PolicyResult.FAILURE);
+ }
+
+ logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId());
+ setOutcome(outcome, PolicyResult.SUCCESS);
+ postProcessResponse(outcome, rawResponse, response);
+
+ return outcome;
+ }
+
+ /**
+ * Processes a successful response.
+ *
+ * @param outcome outcome to be populated
+ * @param rawResponse raw response
+ * @param response decoded response
+ */
+ protected void postProcessResponse(OperationOutcome outcome, String rawResponse, S response) {
+ // do nothing
+ }
+
+ /**
+ * Determines if the response indicates success.
+ *
+ * @param rawResponse raw response
+ * @param response decoded response
+ * @return {@code true} if the response indicates success, {@code false} otherwise
+ */
+ protected abstract boolean isSuccess(String rawResponse, S response);
+
+ /**
+ * Logs a TOPIC request. If the request is not of type, String, then it attempts to
+ * pretty-print it into JSON before logging.
+ *
+ * @param infrastructures list of communication infrastructures on which it was
+ * published
+ * @param request request to be logged
+ */
+ protected void logTopicRequest(List<CommInfrastructure> infrastructures, Q request) {
+ if (infrastructures.isEmpty()) {
+ return;
+ }
+
+ String json;
+ try {
+ if (request == null) {
+ json = null;
+ } else if (request instanceof String) {
+ json = request.toString();
+ } else {
+ json = makeCoder().encode(request, true);
+ }
+
+ } catch (CoderException e) {
+ logger.warn("cannot pretty-print request", e);
+ json = request.toString();
+ }
+
+ for (CommInfrastructure infra : infrastructures) {
+ logger.info("[OUT|{}|{}|]{}{}", infra, pairParams.getTarget(), NetLoggerUtil.SYSTEM_LS, json);
+ }
+ }
+
+ /**
+ * Logs a TOPIC response. If the response is not of type, String, then it attempts to
+ * pretty-print it into JSON before logging.
+ *
+ * @param infra communication infrastructure on which the response was received
+ * @param response response to be logged
+ */
+ protected <T> void logTopicResponse(CommInfrastructure infra, T response) {
+ String json;
+ try {
+ if (response == null) {
+ json = null;
+ } else if (response instanceof String) {
+ json = response.toString();
+ } else {
+ json = makeCoder().encode(response, true);
+ }
+
+ } catch (CoderException e) {
+ logger.warn("cannot pretty-print response", e);
+ json = response.toString();
+ }
+
+ logger.info("[IN|{}|{}|]{}{}", infra, pairParams.getSource(), NetLoggerUtil.SYSTEM_LS, json);
+ }
+
+ // these may be overridden by junit tests
+
+ protected Coder makeCoder() {
+ return coder;
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java
new file mode 100644
index 000000000..8ce013388
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java
@@ -0,0 +1,156 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.impl;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+import lombok.Getter;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.controlloop.actorserviceprovider.Operation;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams;
+import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
+import org.onap.policy.controlloop.actorserviceprovider.topic.SelectorKey;
+import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
+import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager;
+
+/**
+ * Operator that uses a pair of topics, one for publishing the request, and another for
+ * receiving the response. Topic operators may share a {@link TopicPair}.
+ */
+public abstract class TopicPairOperator extends OperatorPartial {
+
+ /**
+ * Manager from which to get the topic pair.
+ */
+ private final TopicPairManager pairManager;
+
+ /**
+ * Keys used to extract the fields used to select responses for this operator.
+ */
+ private final List<SelectorKey> selectorKeys;
+
+ /*
+ * The remaining fields are initialized when configure() is invoked, thus they may
+ * change.
+ */
+
+ /**
+ * Current parameters. While {@link params} may change, the values contained within it
+ * will not, thus operations may copy it.
+ */
+ @Getter
+ private TopicPairParams params;
+
+ /**
+ * Topic pair associated with the parameters.
+ */
+ @Getter
+ private TopicPair topicPair;
+
+ /**
+ * Forwarder associated with the parameters.
+ */
+ @Getter
+ private Forwarder forwarder;
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param actorName name of the actor with which this operator is associated
+ * @param name operation name
+ * @param pairManager manager from which to get the topic pair
+ * @param selectorKeys keys used to extract the fields used to select responses for
+ * this operator
+ */
+ public TopicPairOperator(String actorName, String name, TopicPairManager pairManager,
+ List<SelectorKey> selectorKeys) {
+ super(actorName, name);
+ this.pairManager = pairManager;
+ this.selectorKeys = selectorKeys;
+ }
+
+ @Override
+ protected void doConfigure(Map<String, Object> parameters) {
+ params = Util.translate(getFullName(), parameters, TopicPairParams.class);
+ ValidationResult result = params.validate(getFullName());
+ if (!result.isValid()) {
+ throw new ParameterValidationRuntimeException("invalid parameters", result);
+ }
+
+ topicPair = pairManager.getTopicPair(params.getSource(), params.getTarget());
+ forwarder = topicPair.addForwarder(selectorKeys);
+ }
+
+ /**
+ * Makes an operator that will construct operations.
+ *
+ * @param <Q> request type
+ * @param <S> response type
+ * @param actorName actor name
+ * @param operation operation name
+ * @param pairManager manager from which to get the topic pair
+ * @param operationMaker function to make an operation
+ * @param keys keys used to extract the fields used to select responses for this
+ * operator
+ * @return a new operator
+ */
+ // @formatter:off
+ public static <Q,S> TopicPairOperator makeOperator(String actorName, String operation, TopicPairManager pairManager,
+ BiFunction<ControlLoopOperationParams, TopicPairOperator, TopicPairOperation<Q,S>> operationMaker,
+ SelectorKey... keys) {
+ // @formatter:off
+
+ return makeOperator(actorName, operation, pairManager, Arrays.asList(keys), operationMaker);
+ }
+
+ /**
+ * Makes an operator that will construct operations.
+ *
+ * @param <Q> request type
+ * @param <S> response type
+ * @param actorName actor name
+ * @param operation operation name
+ * @param pairManager manager from which to get the topic pair
+ * @param keys keys used to extract the fields used to select responses for
+ * this operator
+ * @param operationMaker function to make an operation
+ * @return a new operator
+ */
+ // @formatter:off
+ public static <Q,S> TopicPairOperator makeOperator(String actorName, String operation, TopicPairManager pairManager,
+ List<SelectorKey> keys,
+ BiFunction<ControlLoopOperationParams, TopicPairOperator, TopicPairOperation<Q,S>> operationMaker) {
+ // @formatter:on
+
+ return new TopicPairOperator(actorName, operation, pairManager, keys) {
+ @Override
+ public synchronized Operation buildOperation(ControlLoopOperationParams params) {
+ return operationMaker.apply(params, this);
+ }
+ };
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java
new file mode 100644
index 000000000..42a44ee9c
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java
@@ -0,0 +1,93 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.parameters;
+
+import java.util.Map;
+import lombok.Builder;
+import lombok.Data;
+import org.onap.policy.common.parameters.BeanValidationResult;
+import org.onap.policy.common.parameters.BeanValidator;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.common.parameters.annotations.NotBlank;
+import org.onap.policy.common.parameters.annotations.NotNull;
+
+/**
+ * Parameters used by Actors whose Operators use a pair of Topics, one to publish requests
+ * and the other to receive responses.
+ */
+@NotNull
+@NotBlank
+@Data
+@Builder
+public class TopicPairActorParams {
+
+ /**
+ * This contains the default parameters that are used when an operation doesn't
+ * specify them. Note: each operation to be used must still have an entry in
+ * {@link #operation}, even if it's empty. Otherwise, the given operation will not be
+ * started.
+ */
+ private TopicPairParams defaults;
+
+ /**
+ * Maps an operation name to its individual parameters.
+ */
+ private Map<String, Map<String, Object>> operation;
+
+
+ /**
+ * Validates the parameters.
+ *
+ * @param name name of the object containing these parameters
+ * @return "this"
+ * @throws IllegalArgumentException if the parameters are invalid
+ */
+ public TopicPairActorParams doValidation(String name) {
+ ValidationResult result = validate(name);
+ if (!result.isValid()) {
+ throw new ParameterValidationRuntimeException("invalid parameters", result);
+ }
+
+ return this;
+ }
+
+ /**
+ * Validates the parameters.
+ *
+ * @param resultName name of the result
+ *
+ * @return the validation result
+ */
+ public ValidationResult validate(String resultName) {
+ BeanValidationResult result = new BeanValidator().validateTop(resultName, this);
+
+ if (defaults != null) {
+ result.addResult(defaults.validate("defaults"));
+ }
+
+ // @formatter:off
+ result.validateMap("operation", operation,
+ (result2, entry) -> result2.validateNotNull(entry.getKey(), entry.getValue()));
+ // @formatter:on
+
+ return result;
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParams.java
index e6ba7298a..33fcf3052 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParams.java
@@ -29,34 +29,34 @@ import org.onap.policy.common.parameters.annotations.NotBlank;
import org.onap.policy.common.parameters.annotations.NotNull;
/**
- * Parameters used by Operators that connect to a server via DMaaP.
+ * Parameters used by Operators that use a pair of Topics, one to publish requests and the
+ * other to receive responses.
*/
@NotNull
@NotBlank
@Data
@Builder(toBuilder = true)
-public class TopicParams {
+public class TopicPairParams {
/**
- * Name of the target topic end point to which requests should be published.
+ * Source topic end point, from which to read responses.
*/
- private String target;
+ private String source;
/**
- * Source topic end point, from which to read responses.
+ * Name of the target topic end point to which requests should be published.
*/
- private String source;
+ private String target;
/**
- * Amount of time, in seconds to wait for the response, where zero indicates that it
- * should wait forever. The default is zero.
+ * Amount of time, in seconds to wait for the response. The default is five minutes.
*/
- @Min(0)
+ @Min(1)
@Builder.Default
- private int timeoutSec = 0;
+ private int timeoutSec = 300;
/**
- * Validates both the publisher and the subscriber parameters.
+ * Validates the parameters.
*
* @param resultName name of the result
*
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java
new file mode 100644
index 000000000..8e9109c9e
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java
@@ -0,0 +1,141 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.topic;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Forwarder that selectively forwards message to listeners based on the content of the
+ * message. Each forwarder is associated with a single set of selector keys. Listeners are
+ * then registered with that forwarder for a particular set of values for the given keys.
+ */
+public class Forwarder {
+ private static final Logger logger = LoggerFactory.getLogger(Forwarder.class);
+
+ /**
+ * Maps a set of field values to one or more listeners.
+ */
+ // @formatter:off
+ private final Map<List<String>, Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String>>
+ values2listeners = new ConcurrentHashMap<>();
+ // @formatter:on
+
+ /**
+ * Keys used to extract the field values from the {@link StandardCoderObject}.
+ */
+ private final List<SelectorKey> keys;
+
+ /**
+ * Constructs the object.
+ *
+ * @param keys keys used to extract the field's value from the
+ * {@link StandardCoderObject}
+ */
+ public Forwarder(List<SelectorKey> keys) {
+ this.keys = keys;
+ }
+
+ /**
+ * Registers a listener for messages containing the given field values.
+ *
+ * @param values field values of interest, in one-to-one correspondence with the keys
+ * @param listener listener to register
+ */
+ public void register(List<String> values, TriConsumer<CommInfrastructure, String, StandardCoderObject> listener) {
+ if (keys.size() != values.size()) {
+ throw new IllegalArgumentException("key/value mismatch");
+ }
+
+ values2listeners.compute(values, (key, listeners) -> {
+ Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String> map = listeners;
+ if (map == null) {
+ map = new ConcurrentHashMap<>();
+ }
+
+ map.put(listener, "");
+ return map;
+ });
+ }
+
+ /**
+ * Unregisters a listener for messages containing the given field values.
+ *
+ * @param values field values of interest, in one-to-one correspondence with the keys
+ * @param listener listener to unregister
+ */
+ public void unregister(List<String> values, TriConsumer<CommInfrastructure, String, StandardCoderObject> listener) {
+ values2listeners.computeIfPresent(values, (key, listeners) -> {
+ listeners.remove(listener);
+ return (listeners.isEmpty() ? null : listeners);
+ });
+ }
+
+ /**
+ * Processes a message, forwarding it to the appropriate listeners, if any.
+ *
+ * @param infra communication infrastructure on which the response was received
+ * @param textMessage original text message that was received
+ * @param scoMessage decoded text message
+ */
+ public void onMessage(CommInfrastructure infra, String textMessage, StandardCoderObject scoMessage) {
+ // extract the key values from the message
+ List<String> values = new ArrayList<>(keys.size());
+ for (SelectorKey key : keys) {
+ String value = key.extractField(scoMessage);
+ if (value == null) {
+ /*
+ * No value for this field, so this message is not relevant to this
+ * forwarder.
+ */
+ return;
+ }
+
+ values.add(value);
+ }
+
+ // get the listeners for this set of values
+ Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String> listeners =
+ values2listeners.get(values);
+ if (listeners == null) {
+ // no listeners for this particular list of values
+ return;
+ }
+
+
+ // forward the message to each listener
+ for (TriConsumer<CommInfrastructure, String, StandardCoderObject> listener : listeners.keySet()) {
+ try {
+ listener.accept(infra, textMessage, scoMessage);
+ } catch (RuntimeException e) {
+ logger.warn("exception thrown by listener {}", Util.ident(listener), e);
+ }
+ }
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/SelectorKey.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/SelectorKey.java
new file mode 100644
index 000000000..fc5727395
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/SelectorKey.java
@@ -0,0 +1,57 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.topic;
+
+import lombok.EqualsAndHashCode;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+
+/**
+ * Selector key, which contains a hierarchical list of Strings and Integers that are used
+ * to extract the content of a field, typically from a {@link StandardCoderObject}.
+ */
+@EqualsAndHashCode
+public class SelectorKey {
+
+ /**
+ * Names and indices used to extract the field's value.
+ */
+ private final Object[] fieldIdentifiers;
+
+ /**
+ * Constructs the object.
+ *
+ * @param fieldIdentifiers names and indices used to extract the field's value
+ */
+ public SelectorKey(Object... fieldIdentifiers) {
+ this.fieldIdentifiers = fieldIdentifiers;
+ }
+
+ /**
+ * Extracts the given field from an object.
+ *
+ * @param object object from which to extract the field
+ * @return the extracted value, or {@code null} if the object does not contain the
+ * field
+ */
+ public String extractField(StandardCoderObject object) {
+ return object.getString(fieldIdentifiers);
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java
new file mode 100644
index 000000000..eb805ca5d
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java
@@ -0,0 +1,104 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.topic;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A topic listener. When a message arrives on a topic, it is forwarded to listeners based
+ * on the content of fields found within the message. However, depending on the message
+ * type, the relevant fields might be found in different places within the message's
+ * object hierarchy. For each different list of keys, this class maintains a
+ * {@link Forwarder}, which is used to forward the message to all relevant listeners.
+ * <p/>
+ * Once a selector has been added, it is not removed until {@link #shutdown()} is invoked.
+ * As selectors are typically only added by Operators, and not by individual Operations,
+ * this should not pose a problem.
+ */
+public class TopicListenerImpl implements TopicListener {
+ private static final Logger logger = LoggerFactory.getLogger(TopicListenerImpl.class);
+ private static StandardCoder coder = new StandardCoder();
+
+ /**
+ * Maps selector to a forwarder.
+ */
+ private final Map<List<SelectorKey>, Forwarder> selector2forwarder = new ConcurrentHashMap<>();
+
+
+ /**
+ * Removes all forwarders.
+ */
+ public void shutdown() {
+ selector2forwarder.clear();
+ }
+
+ /**
+ * Adds a forwarder, if it doesn't already exist.
+ *
+ * @param keys the selector keys
+ * @return the forwarder associated with the given selector keys
+ */
+ public Forwarder addForwarder(SelectorKey... keys) {
+ return addForwarder(Arrays.asList(keys));
+ }
+
+ /**
+ * Adds a forwarder, if it doesn't already exist.
+ *
+ * @param keys the selector keys
+ * @return the forwarder associated with the given selector keys
+ */
+ public Forwarder addForwarder(List<SelectorKey> keys) {
+ return selector2forwarder.computeIfAbsent(keys, key -> new Forwarder(keys));
+ }
+
+ /**
+ * Decodes the message and then forwards it to each forwarder for processing.
+ */
+ @Override
+ public void onTopicEvent(CommInfrastructure infra, String topic, String message) {
+ StandardCoderObject object;
+ try {
+ object = coder.decode(message, StandardCoderObject.class);
+ } catch (CoderException e) {
+ logger.warn("cannot decode message", e);
+ return;
+ }
+
+ /*
+ * We don't know which selector is appropriate for the message, so we just let
+ * them all take a crack at it.
+ */
+ for (Forwarder forwarder : selector2forwarder.values()) {
+ forwarder.onMessage(infra, message, object);
+ }
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java
new file mode 100644
index 000000000..c0cfe2571
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java
@@ -0,0 +1,122 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.topic;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import lombok.Getter;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A pair of topics, one of which is used to publish requests and the other to receive
+ * responses.
+ */
+public class TopicPair extends TopicListenerImpl {
+ private static final Logger logger = LoggerFactory.getLogger(TopicPair.class);
+
+ @Getter
+ private final String source;
+
+ @Getter
+ private final String target;
+
+ private final List<TopicSink> publishers;
+ private final List<TopicSource> subscribers;
+
+ /**
+ * Constructs the object.
+ *
+ * @param source source topic name
+ * @param target target topic name
+ */
+ public TopicPair(String source, String target) {
+ this.source = source;
+ this.target = target;
+
+ publishers = getTopicEndpointManager().getTopicSinks(target);
+ if (publishers.isEmpty()) {
+ throw new IllegalArgumentException("no sinks for topic: " + target);
+ }
+
+ subscribers = getTopicEndpointManager().getTopicSources(Arrays.asList(source));
+ if (subscribers.isEmpty()) {
+ throw new IllegalArgumentException("no sources for topic: " + source);
+ }
+ }
+
+ /**
+ * Starts listening on the source topic(s).
+ */
+ public void start() {
+ subscribers.forEach(topic -> topic.register(this));
+ }
+
+ /**
+ * Stops listening on the source topic(s).
+ */
+ public void stop() {
+ subscribers.forEach(topic -> topic.unregister(this));
+ }
+
+ /**
+ * Stops listening on the source topic(s) and clears all of the forwarders.
+ */
+ @Override
+ public void shutdown() {
+ stop();
+ super.shutdown();
+ }
+
+ /**
+ * Publishes a message to the target topic.
+ *
+ * @param message message to be published
+ * @return a list of the infrastructures on which it was published
+ */
+ public List<CommInfrastructure> publish(String message) {
+ List<CommInfrastructure> infrastructures = new ArrayList<>(publishers.size());
+
+ for (TopicSink topic : publishers) {
+ try {
+ topic.send(message);
+ infrastructures.add(topic.getTopicCommInfrastructure());
+
+ } catch (RuntimeException e) {
+ logger.warn("cannot publish to {}:{}", topic.getTopicCommInfrastructure(), target, e);
+ }
+ }
+
+ return infrastructures;
+ }
+
+ // these may be overridden by junit tests
+
+ protected TopicEndpoint getTopicEndpointManager() {
+ return TopicEndpointManager.getManager();
+ }
+}
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java
new file mode 100644
index 000000000..c351f95f6
--- /dev/null
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java
@@ -0,0 +1,37 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actorserviceprovider.topic;
+
+/**
+ * Manages topic pairs.
+ */
+@FunctionalInterface
+public interface TopicPairManager {
+
+ /**
+ * Gets the topic pair for the given parameters, creating one if it does not exist.
+ *
+ * @param source source topic name
+ * @param target target topic name
+ * @return the topic pair associated with the given source and target topics
+ */
+ TopicPair getTopicPair(String source, String target);
+}