diff options
author | Jim Hahn <jrh3@att.com> | 2020-02-13 12:34:46 -0500 |
---|---|---|
committer | Jim Hahn <jrh3@att.com> | 2020-02-13 20:31:33 -0500 |
commit | 8c6859a5a095f4c98267eac9b051be83f86db122 (patch) | |
tree | 28786ab8c8c4425e6fdd69f52cea903000039eed /models-interactions/model-actors/actorServiceProvider/src/main | |
parent | 708a8b0cc4f4d828976d20747a222aac917c6b38 (diff) |
Add Topic Actor superclasses
Issue-ID: POLICY-2363
Change-Id: I5d29d85f6c5f40fb6c8f1bf678d9c718760a7558
Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'models-interactions/model-actors/actorServiceProvider/src/main')
11 files changed, 1152 insertions, 14 deletions
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java index c4bf5f484..ba75f0be6 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java @@ -181,9 +181,9 @@ public abstract class HttpOperation<T> extends OperationPartial { try { response = makeCoder().decode(strResponse, responseClass); } catch (CoderException e) { - logger.warn("{}.{} cannot decode response with http error code {} for {}", params.getActor(), - params.getOperation(), rawResponse.getStatus(), params.getRequestId(), e); - return setOutcome(outcome, PolicyResult.FAILURE_EXCEPTION); + logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(), + params.getRequestId(), e); + throw new IllegalArgumentException("cannot decode response"); } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java new file mode 100644 index 000000000..c3e1e5c4d --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java @@ -0,0 +1,112 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import org.apache.commons.lang3.tuple.Pair; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; +import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairActorParams; +import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair; +import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager; + +/** + * Actor that uses a topic pair. The actor's parameters must be a + * {@link TopicPairActorParams}. + */ +public class TopicPairActor extends ActorImpl implements TopicPairManager { + + /** + * Maps a topic source and target name to its topic pair. + */ + private final Map<Pair<String, String>, TopicPair> params2topic = new ConcurrentHashMap<>(); + + + /** + * Constructs the object. + * + * @param name actor's name + */ + public TopicPairActor(String name) { + super(name); + } + + @Override + protected void doStart() { + params2topic.values().forEach(TopicPair::start); + super.doStart(); + } + + @Override + protected void doStop() { + params2topic.values().forEach(TopicPair::stop); + super.doStop(); + } + + @Override + protected void doShutdown() { + params2topic.values().forEach(TopicPair::shutdown); + params2topic.clear(); + super.doShutdown(); + } + + @Override + public TopicPair getTopicPair(String source, String target) { + Pair<String, String> key = Pair.of(source, target); + return params2topic.computeIfAbsent(key, pair -> new TopicPair(source, target)); + } + + /** + * Translates the parameters to a {@link TopicPairActorParams} and then creates a + * function that will extract operator-specific parameters. + */ + @Override + protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) { + String actorName = getName(); + + TopicPairActorParams params = Util.translate(actorName, actorParameters, TopicPairActorParams.class); + ValidationResult result = params.validate(getName()); + if (!result.isValid()) { + throw new ParameterValidationRuntimeException("invalid parameters", result); + } + + // create a map of the default parameters + Map<String, Object> defaultParams = Util.translateToMap(getName(), params.getDefaults()); + Map<String, Map<String, Object>> operations = params.getOperation(); + + return operationName -> { + Map<String, Object> specificParams = operations.get(operationName); + if (specificParams == null) { + return null; + } + + // start with a copy of defaults and overlay with specific + Map<String, Object> subparams = new TreeMap<>(defaultParams); + subparams.putAll(specificParams); + + return Util.translateToMap(getName() + "." + operationName, subparams); + }; + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java new file mode 100644 index 000000000..6b584d7c6 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java @@ -0,0 +1,316 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import lombok.Getter; +import org.apache.commons.lang3.tuple.Triple; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil; +import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams; +import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; +import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder; +import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair; +import org.onap.policy.controlloop.policy.PolicyResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Operation that uses a Topic pair. + * + * @param <S> response type + */ +@Getter +public abstract class TopicPairOperation<Q, S> extends OperationPartial { + private static final Logger logger = LoggerFactory.getLogger(TopicPairOperation.class); + private static final Coder coder = new StandardCoder(); + + // fields extracted from the operator + + private final TopicPair topicPair; + private final Forwarder forwarder; + private final TopicPairParams pairParams; + private final long timeoutMs; + + /** + * Response class. + */ + private final Class<S> responseClass; + + + /** + * Constructs the object. + * + * @param params operation parameters + * @param operator operator that created this operation + * @param clazz response class + */ + public TopicPairOperation(ControlLoopOperationParams params, TopicPairOperator operator, Class<S> clazz) { + super(params, operator); + this.topicPair = operator.getTopicPair(); + this.forwarder = operator.getForwarder(); + this.pairParams = operator.getParams(); + this.responseClass = clazz; + this.timeoutMs = TimeUnit.MILLISECONDS.convert(pairParams.getTimeoutSec(), TimeUnit.SECONDS); + } + + /** + * If no timeout is specified, then it returns the default timeout. + */ + @Override + protected long getTimeoutMs(Integer timeoutSec) { + // TODO move this method to the superclass + return (timeoutSec == null || timeoutSec == 0 ? this.timeoutMs : super.getTimeoutMs(timeoutSec)); + } + + /** + * Publishes the request and arranges to receive the response. + */ + @Override + protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) { + + final Q request = makeRequest(attempt); + final List<String> expectedKeyValues = getExpectedKeyValues(attempt, request); + + final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + final CompletableFuture<Triple<CommInfrastructure, String, StandardCoderObject>> future = + new CompletableFuture<>(); + final Executor executor = params.getExecutor(); + + // register a listener BEFORE publishing + + // @formatter:off + TriConsumer<CommInfrastructure, String, StandardCoderObject> listener = + (infra, rawResponse, scoResponse) -> future.complete(Triple.of(infra, rawResponse, scoResponse)); + // @formatter:on + + // TODO this currently only allows a single matching response + + forwarder.register(expectedKeyValues, listener); + + // ensure listener is unregistered if the controller is canceled + controller.add(() -> forwarder.unregister(expectedKeyValues, listener)); + + // publish the request + try { + publishRequest(request); + } catch (RuntimeException e) { + logger.warn("{}: failed to publish request for {}", getFullName(), params.getRequestId()); + forwarder.unregister(expectedKeyValues, listener); + throw e; + } + + + // once "future" completes, process the response, and then complete the controller + + // @formatter:off + future.thenApplyAsync( + triple -> processResponse(triple.getLeft(), outcome, triple.getMiddle(), triple.getRight()), + executor) + .whenCompleteAsync(controller.delayedComplete(), executor); + // @formatter:on + + return controller; + } + + /** + * Makes the request. + * + * @param attempt operation attempt + * @return a new request + */ + protected abstract Q makeRequest(int attempt); + + /** + * Gets values, expected in the response, that should match the selector keys. + * + * @param attempt operation attempt + * @param request request to be published + * @return a list of the values to be matched by the selector keys + */ + protected abstract List<String> getExpectedKeyValues(int attempt, Q request); + + /** + * Publishes the request. Encodes the request, if it is not already a String. + * + * @param request request to be published + */ + protected void publishRequest(Q request) { + String json; + try { + if (request instanceof String) { + json = request.toString(); + } else { + json = makeCoder().encode(request); + } + } catch (CoderException e) { + throw new IllegalArgumentException("cannot encode request", e); + } + + List<CommInfrastructure> list = topicPair.publish(json); + if (list.isEmpty()) { + throw new IllegalStateException("nothing published"); + } + + logTopicRequest(list, request); + } + + /** + * Processes a response. + * + * @param infra communication infrastructure on which the response was received + * @param outcome outcome to be populated + * @param response raw response to process + * @param scoResponse response, as a {@link StandardCoderObject} + * @return the outcome + */ + protected OperationOutcome processResponse(CommInfrastructure infra, OperationOutcome outcome, String rawResponse, + StandardCoderObject scoResponse) { + + logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId()); + + logTopicResponse(infra, rawResponse); + + S response; + if (responseClass == String.class) { + response = responseClass.cast(rawResponse); + + } else if (responseClass == StandardCoderObject.class) { + response = responseClass.cast(scoResponse); + + } else { + try { + response = makeCoder().decode(rawResponse, responseClass); + } catch (CoderException e) { + logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + throw new IllegalArgumentException("cannot decode response", e); + } + } + + if (!isSuccess(rawResponse, response)) { + logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + return setOutcome(outcome, PolicyResult.FAILURE); + } + + logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId()); + setOutcome(outcome, PolicyResult.SUCCESS); + postProcessResponse(outcome, rawResponse, response); + + return outcome; + } + + /** + * Processes a successful response. + * + * @param outcome outcome to be populated + * @param rawResponse raw response + * @param response decoded response + */ + protected void postProcessResponse(OperationOutcome outcome, String rawResponse, S response) { + // do nothing + } + + /** + * Determines if the response indicates success. + * + * @param rawResponse raw response + * @param response decoded response + * @return {@code true} if the response indicates success, {@code false} otherwise + */ + protected abstract boolean isSuccess(String rawResponse, S response); + + /** + * Logs a TOPIC request. If the request is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param infrastructures list of communication infrastructures on which it was + * published + * @param request request to be logged + */ + protected void logTopicRequest(List<CommInfrastructure> infrastructures, Q request) { + if (infrastructures.isEmpty()) { + return; + } + + String json; + try { + if (request == null) { + json = null; + } else if (request instanceof String) { + json = request.toString(); + } else { + json = makeCoder().encode(request, true); + } + + } catch (CoderException e) { + logger.warn("cannot pretty-print request", e); + json = request.toString(); + } + + for (CommInfrastructure infra : infrastructures) { + logger.info("[OUT|{}|{}|]{}{}", infra, pairParams.getTarget(), NetLoggerUtil.SYSTEM_LS, json); + } + } + + /** + * Logs a TOPIC response. If the response is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param infra communication infrastructure on which the response was received + * @param response response to be logged + */ + protected <T> void logTopicResponse(CommInfrastructure infra, T response) { + String json; + try { + if (response == null) { + json = null; + } else if (response instanceof String) { + json = response.toString(); + } else { + json = makeCoder().encode(response, true); + } + + } catch (CoderException e) { + logger.warn("cannot pretty-print response", e); + json = response.toString(); + } + + logger.info("[IN|{}|{}|]{}{}", infra, pairParams.getSource(), NetLoggerUtil.SYSTEM_LS, json); + } + + // these may be overridden by junit tests + + protected Coder makeCoder() { + return coder; + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java new file mode 100644 index 000000000..8ce013388 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java @@ -0,0 +1,156 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; +import lombok.Getter; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.controlloop.actorserviceprovider.Operation; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; +import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams; +import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder; +import org.onap.policy.controlloop.actorserviceprovider.topic.SelectorKey; +import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair; +import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager; + +/** + * Operator that uses a pair of topics, one for publishing the request, and another for + * receiving the response. Topic operators may share a {@link TopicPair}. + */ +public abstract class TopicPairOperator extends OperatorPartial { + + /** + * Manager from which to get the topic pair. + */ + private final TopicPairManager pairManager; + + /** + * Keys used to extract the fields used to select responses for this operator. + */ + private final List<SelectorKey> selectorKeys; + + /* + * The remaining fields are initialized when configure() is invoked, thus they may + * change. + */ + + /** + * Current parameters. While {@link params} may change, the values contained within it + * will not, thus operations may copy it. + */ + @Getter + private TopicPairParams params; + + /** + * Topic pair associated with the parameters. + */ + @Getter + private TopicPair topicPair; + + /** + * Forwarder associated with the parameters. + */ + @Getter + private Forwarder forwarder; + + + /** + * Constructs the object. + * + * @param actorName name of the actor with which this operator is associated + * @param name operation name + * @param pairManager manager from which to get the topic pair + * @param selectorKeys keys used to extract the fields used to select responses for + * this operator + */ + public TopicPairOperator(String actorName, String name, TopicPairManager pairManager, + List<SelectorKey> selectorKeys) { + super(actorName, name); + this.pairManager = pairManager; + this.selectorKeys = selectorKeys; + } + + @Override + protected void doConfigure(Map<String, Object> parameters) { + params = Util.translate(getFullName(), parameters, TopicPairParams.class); + ValidationResult result = params.validate(getFullName()); + if (!result.isValid()) { + throw new ParameterValidationRuntimeException("invalid parameters", result); + } + + topicPair = pairManager.getTopicPair(params.getSource(), params.getTarget()); + forwarder = topicPair.addForwarder(selectorKeys); + } + + /** + * Makes an operator that will construct operations. + * + * @param <Q> request type + * @param <S> response type + * @param actorName actor name + * @param operation operation name + * @param pairManager manager from which to get the topic pair + * @param operationMaker function to make an operation + * @param keys keys used to extract the fields used to select responses for this + * operator + * @return a new operator + */ + // @formatter:off + public static <Q,S> TopicPairOperator makeOperator(String actorName, String operation, TopicPairManager pairManager, + BiFunction<ControlLoopOperationParams, TopicPairOperator, TopicPairOperation<Q,S>> operationMaker, + SelectorKey... keys) { + // @formatter:off + + return makeOperator(actorName, operation, pairManager, Arrays.asList(keys), operationMaker); + } + + /** + * Makes an operator that will construct operations. + * + * @param <Q> request type + * @param <S> response type + * @param actorName actor name + * @param operation operation name + * @param pairManager manager from which to get the topic pair + * @param keys keys used to extract the fields used to select responses for + * this operator + * @param operationMaker function to make an operation + * @return a new operator + */ + // @formatter:off + public static <Q,S> TopicPairOperator makeOperator(String actorName, String operation, TopicPairManager pairManager, + List<SelectorKey> keys, + BiFunction<ControlLoopOperationParams, TopicPairOperator, TopicPairOperation<Q,S>> operationMaker) { + // @formatter:on + + return new TopicPairOperator(actorName, operation, pairManager, keys) { + @Override + public synchronized Operation buildOperation(ControlLoopOperationParams params) { + return operationMaker.apply(params, this); + } + }; + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java new file mode 100644 index 000000000..42a44ee9c --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java @@ -0,0 +1,93 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.parameters; + +import java.util.Map; +import lombok.Builder; +import lombok.Data; +import org.onap.policy.common.parameters.BeanValidationResult; +import org.onap.policy.common.parameters.BeanValidator; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.common.parameters.annotations.NotBlank; +import org.onap.policy.common.parameters.annotations.NotNull; + +/** + * Parameters used by Actors whose Operators use a pair of Topics, one to publish requests + * and the other to receive responses. + */ +@NotNull +@NotBlank +@Data +@Builder +public class TopicPairActorParams { + + /** + * This contains the default parameters that are used when an operation doesn't + * specify them. Note: each operation to be used must still have an entry in + * {@link #operation}, even if it's empty. Otherwise, the given operation will not be + * started. + */ + private TopicPairParams defaults; + + /** + * Maps an operation name to its individual parameters. + */ + private Map<String, Map<String, Object>> operation; + + + /** + * Validates the parameters. + * + * @param name name of the object containing these parameters + * @return "this" + * @throws IllegalArgumentException if the parameters are invalid + */ + public TopicPairActorParams doValidation(String name) { + ValidationResult result = validate(name); + if (!result.isValid()) { + throw new ParameterValidationRuntimeException("invalid parameters", result); + } + + return this; + } + + /** + * Validates the parameters. + * + * @param resultName name of the result + * + * @return the validation result + */ + public ValidationResult validate(String resultName) { + BeanValidationResult result = new BeanValidator().validateTop(resultName, this); + + if (defaults != null) { + result.addResult(defaults.validate("defaults")); + } + + // @formatter:off + result.validateMap("operation", operation, + (result2, entry) -> result2.validateNotNull(entry.getKey(), entry.getValue())); + // @formatter:on + + return result; + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParams.java index e6ba7298a..33fcf3052 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParams.java @@ -29,34 +29,34 @@ import org.onap.policy.common.parameters.annotations.NotBlank; import org.onap.policy.common.parameters.annotations.NotNull; /** - * Parameters used by Operators that connect to a server via DMaaP. + * Parameters used by Operators that use a pair of Topics, one to publish requests and the + * other to receive responses. */ @NotNull @NotBlank @Data @Builder(toBuilder = true) -public class TopicParams { +public class TopicPairParams { /** - * Name of the target topic end point to which requests should be published. + * Source topic end point, from which to read responses. */ - private String target; + private String source; /** - * Source topic end point, from which to read responses. + * Name of the target topic end point to which requests should be published. */ - private String source; + private String target; /** - * Amount of time, in seconds to wait for the response, where zero indicates that it - * should wait forever. The default is zero. + * Amount of time, in seconds to wait for the response. The default is five minutes. */ - @Min(0) + @Min(1) @Builder.Default - private int timeoutSec = 0; + private int timeoutSec = 300; /** - * Validates both the publisher and the subscriber parameters. + * Validates the parameters. * * @param resultName name of the result * diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java new file mode 100644 index 000000000..8e9109c9e --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java @@ -0,0 +1,141 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.topic; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Forwarder that selectively forwards message to listeners based on the content of the + * message. Each forwarder is associated with a single set of selector keys. Listeners are + * then registered with that forwarder for a particular set of values for the given keys. + */ +public class Forwarder { + private static final Logger logger = LoggerFactory.getLogger(Forwarder.class); + + /** + * Maps a set of field values to one or more listeners. + */ + // @formatter:off + private final Map<List<String>, Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String>> + values2listeners = new ConcurrentHashMap<>(); + // @formatter:on + + /** + * Keys used to extract the field values from the {@link StandardCoderObject}. + */ + private final List<SelectorKey> keys; + + /** + * Constructs the object. + * + * @param keys keys used to extract the field's value from the + * {@link StandardCoderObject} + */ + public Forwarder(List<SelectorKey> keys) { + this.keys = keys; + } + + /** + * Registers a listener for messages containing the given field values. + * + * @param values field values of interest, in one-to-one correspondence with the keys + * @param listener listener to register + */ + public void register(List<String> values, TriConsumer<CommInfrastructure, String, StandardCoderObject> listener) { + if (keys.size() != values.size()) { + throw new IllegalArgumentException("key/value mismatch"); + } + + values2listeners.compute(values, (key, listeners) -> { + Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String> map = listeners; + if (map == null) { + map = new ConcurrentHashMap<>(); + } + + map.put(listener, ""); + return map; + }); + } + + /** + * Unregisters a listener for messages containing the given field values. + * + * @param values field values of interest, in one-to-one correspondence with the keys + * @param listener listener to unregister + */ + public void unregister(List<String> values, TriConsumer<CommInfrastructure, String, StandardCoderObject> listener) { + values2listeners.computeIfPresent(values, (key, listeners) -> { + listeners.remove(listener); + return (listeners.isEmpty() ? null : listeners); + }); + } + + /** + * Processes a message, forwarding it to the appropriate listeners, if any. + * + * @param infra communication infrastructure on which the response was received + * @param textMessage original text message that was received + * @param scoMessage decoded text message + */ + public void onMessage(CommInfrastructure infra, String textMessage, StandardCoderObject scoMessage) { + // extract the key values from the message + List<String> values = new ArrayList<>(keys.size()); + for (SelectorKey key : keys) { + String value = key.extractField(scoMessage); + if (value == null) { + /* + * No value for this field, so this message is not relevant to this + * forwarder. + */ + return; + } + + values.add(value); + } + + // get the listeners for this set of values + Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String> listeners = + values2listeners.get(values); + if (listeners == null) { + // no listeners for this particular list of values + return; + } + + + // forward the message to each listener + for (TriConsumer<CommInfrastructure, String, StandardCoderObject> listener : listeners.keySet()) { + try { + listener.accept(infra, textMessage, scoMessage); + } catch (RuntimeException e) { + logger.warn("exception thrown by listener {}", Util.ident(listener), e); + } + } + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/SelectorKey.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/SelectorKey.java new file mode 100644 index 000000000..fc5727395 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/SelectorKey.java @@ -0,0 +1,57 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.topic; + +import lombok.EqualsAndHashCode; +import org.onap.policy.common.utils.coder.StandardCoderObject; + +/** + * Selector key, which contains a hierarchical list of Strings and Integers that are used + * to extract the content of a field, typically from a {@link StandardCoderObject}. + */ +@EqualsAndHashCode +public class SelectorKey { + + /** + * Names and indices used to extract the field's value. + */ + private final Object[] fieldIdentifiers; + + /** + * Constructs the object. + * + * @param fieldIdentifiers names and indices used to extract the field's value + */ + public SelectorKey(Object... fieldIdentifiers) { + this.fieldIdentifiers = fieldIdentifiers; + } + + /** + * Extracts the given field from an object. + * + * @param object object from which to extract the field + * @return the extracted value, or {@code null} if the object does not contain the + * field + */ + public String extractField(StandardCoderObject object) { + return object.getString(fieldIdentifiers); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java new file mode 100644 index 000000000..eb805ca5d --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java @@ -0,0 +1,104 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.topic; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.TopicListener; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A topic listener. When a message arrives on a topic, it is forwarded to listeners based + * on the content of fields found within the message. However, depending on the message + * type, the relevant fields might be found in different places within the message's + * object hierarchy. For each different list of keys, this class maintains a + * {@link Forwarder}, which is used to forward the message to all relevant listeners. + * <p/> + * Once a selector has been added, it is not removed until {@link #shutdown()} is invoked. + * As selectors are typically only added by Operators, and not by individual Operations, + * this should not pose a problem. + */ +public class TopicListenerImpl implements TopicListener { + private static final Logger logger = LoggerFactory.getLogger(TopicListenerImpl.class); + private static StandardCoder coder = new StandardCoder(); + + /** + * Maps selector to a forwarder. + */ + private final Map<List<SelectorKey>, Forwarder> selector2forwarder = new ConcurrentHashMap<>(); + + + /** + * Removes all forwarders. + */ + public void shutdown() { + selector2forwarder.clear(); + } + + /** + * Adds a forwarder, if it doesn't already exist. + * + * @param keys the selector keys + * @return the forwarder associated with the given selector keys + */ + public Forwarder addForwarder(SelectorKey... keys) { + return addForwarder(Arrays.asList(keys)); + } + + /** + * Adds a forwarder, if it doesn't already exist. + * + * @param keys the selector keys + * @return the forwarder associated with the given selector keys + */ + public Forwarder addForwarder(List<SelectorKey> keys) { + return selector2forwarder.computeIfAbsent(keys, key -> new Forwarder(keys)); + } + + /** + * Decodes the message and then forwards it to each forwarder for processing. + */ + @Override + public void onTopicEvent(CommInfrastructure infra, String topic, String message) { + StandardCoderObject object; + try { + object = coder.decode(message, StandardCoderObject.class); + } catch (CoderException e) { + logger.warn("cannot decode message", e); + return; + } + + /* + * We don't know which selector is appropriate for the message, so we just let + * them all take a crack at it. + */ + for (Forwarder forwarder : selector2forwarder.values()) { + forwarder.onMessage(infra, message, object); + } + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java new file mode 100644 index 000000000..c0cfe2571 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java @@ -0,0 +1,122 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.topic; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import lombok.Getter; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; +import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A pair of topics, one of which is used to publish requests and the other to receive + * responses. + */ +public class TopicPair extends TopicListenerImpl { + private static final Logger logger = LoggerFactory.getLogger(TopicPair.class); + + @Getter + private final String source; + + @Getter + private final String target; + + private final List<TopicSink> publishers; + private final List<TopicSource> subscribers; + + /** + * Constructs the object. + * + * @param source source topic name + * @param target target topic name + */ + public TopicPair(String source, String target) { + this.source = source; + this.target = target; + + publishers = getTopicEndpointManager().getTopicSinks(target); + if (publishers.isEmpty()) { + throw new IllegalArgumentException("no sinks for topic: " + target); + } + + subscribers = getTopicEndpointManager().getTopicSources(Arrays.asList(source)); + if (subscribers.isEmpty()) { + throw new IllegalArgumentException("no sources for topic: " + source); + } + } + + /** + * Starts listening on the source topic(s). + */ + public void start() { + subscribers.forEach(topic -> topic.register(this)); + } + + /** + * Stops listening on the source topic(s). + */ + public void stop() { + subscribers.forEach(topic -> topic.unregister(this)); + } + + /** + * Stops listening on the source topic(s) and clears all of the forwarders. + */ + @Override + public void shutdown() { + stop(); + super.shutdown(); + } + + /** + * Publishes a message to the target topic. + * + * @param message message to be published + * @return a list of the infrastructures on which it was published + */ + public List<CommInfrastructure> publish(String message) { + List<CommInfrastructure> infrastructures = new ArrayList<>(publishers.size()); + + for (TopicSink topic : publishers) { + try { + topic.send(message); + infrastructures.add(topic.getTopicCommInfrastructure()); + + } catch (RuntimeException e) { + logger.warn("cannot publish to {}:{}", topic.getTopicCommInfrastructure(), target, e); + } + } + + return infrastructures; + } + + // these may be overridden by junit tests + + protected TopicEndpoint getTopicEndpointManager() { + return TopicEndpointManager.getManager(); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java new file mode 100644 index 000000000..c351f95f6 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java @@ -0,0 +1,37 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.topic; + +/** + * Manages topic pairs. + */ +@FunctionalInterface +public interface TopicPairManager { + + /** + * Gets the topic pair for the given parameters, creating one if it does not exist. + * + * @param source source topic name + * @param target target topic name + * @return the topic pair associated with the given source and target topics + */ + TopicPair getTopicPair(String source, String target); +} |