diff options
Diffstat (limited to 'models-interactions/model-actors/actorServiceProvider/src')
23 files changed, 2617 insertions, 56 deletions
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java index c4bf5f484..ba75f0be6 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java @@ -181,9 +181,9 @@ public abstract class HttpOperation<T> extends OperationPartial { try { response = makeCoder().decode(strResponse, responseClass); } catch (CoderException e) { - logger.warn("{}.{} cannot decode response with http error code {} for {}", params.getActor(), - params.getOperation(), rawResponse.getStatus(), params.getRequestId(), e); - return setOutcome(outcome, PolicyResult.FAILURE_EXCEPTION); + logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(), + params.getRequestId(), e); + throw new IllegalArgumentException("cannot decode response"); } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java new file mode 100644 index 000000000..c3e1e5c4d --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java @@ -0,0 +1,112 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import org.apache.commons.lang3.tuple.Pair; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; +import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairActorParams; +import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair; +import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager; + +/** + * Actor that uses a topic pair. The actor's parameters must be a + * {@link TopicPairActorParams}. + */ +public class TopicPairActor extends ActorImpl implements TopicPairManager { + + /** + * Maps a topic source and target name to its topic pair. + */ + private final Map<Pair<String, String>, TopicPair> params2topic = new ConcurrentHashMap<>(); + + + /** + * Constructs the object. + * + * @param name actor's name + */ + public TopicPairActor(String name) { + super(name); + } + + @Override + protected void doStart() { + params2topic.values().forEach(TopicPair::start); + super.doStart(); + } + + @Override + protected void doStop() { + params2topic.values().forEach(TopicPair::stop); + super.doStop(); + } + + @Override + protected void doShutdown() { + params2topic.values().forEach(TopicPair::shutdown); + params2topic.clear(); + super.doShutdown(); + } + + @Override + public TopicPair getTopicPair(String source, String target) { + Pair<String, String> key = Pair.of(source, target); + return params2topic.computeIfAbsent(key, pair -> new TopicPair(source, target)); + } + + /** + * Translates the parameters to a {@link TopicPairActorParams} and then creates a + * function that will extract operator-specific parameters. + */ + @Override + protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) { + String actorName = getName(); + + TopicPairActorParams params = Util.translate(actorName, actorParameters, TopicPairActorParams.class); + ValidationResult result = params.validate(getName()); + if (!result.isValid()) { + throw new ParameterValidationRuntimeException("invalid parameters", result); + } + + // create a map of the default parameters + Map<String, Object> defaultParams = Util.translateToMap(getName(), params.getDefaults()); + Map<String, Map<String, Object>> operations = params.getOperation(); + + return operationName -> { + Map<String, Object> specificParams = operations.get(operationName); + if (specificParams == null) { + return null; + } + + // start with a copy of defaults and overlay with specific + Map<String, Object> subparams = new TreeMap<>(defaultParams); + subparams.putAll(specificParams); + + return Util.translateToMap(getName() + "." + operationName, subparams); + }; + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java new file mode 100644 index 000000000..6b584d7c6 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java @@ -0,0 +1,316 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import lombok.Getter; +import org.apache.commons.lang3.tuple.Triple; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil; +import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams; +import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; +import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder; +import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair; +import org.onap.policy.controlloop.policy.PolicyResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Operation that uses a Topic pair. + * + * @param <S> response type + */ +@Getter +public abstract class TopicPairOperation<Q, S> extends OperationPartial { + private static final Logger logger = LoggerFactory.getLogger(TopicPairOperation.class); + private static final Coder coder = new StandardCoder(); + + // fields extracted from the operator + + private final TopicPair topicPair; + private final Forwarder forwarder; + private final TopicPairParams pairParams; + private final long timeoutMs; + + /** + * Response class. + */ + private final Class<S> responseClass; + + + /** + * Constructs the object. + * + * @param params operation parameters + * @param operator operator that created this operation + * @param clazz response class + */ + public TopicPairOperation(ControlLoopOperationParams params, TopicPairOperator operator, Class<S> clazz) { + super(params, operator); + this.topicPair = operator.getTopicPair(); + this.forwarder = operator.getForwarder(); + this.pairParams = operator.getParams(); + this.responseClass = clazz; + this.timeoutMs = TimeUnit.MILLISECONDS.convert(pairParams.getTimeoutSec(), TimeUnit.SECONDS); + } + + /** + * If no timeout is specified, then it returns the default timeout. + */ + @Override + protected long getTimeoutMs(Integer timeoutSec) { + // TODO move this method to the superclass + return (timeoutSec == null || timeoutSec == 0 ? this.timeoutMs : super.getTimeoutMs(timeoutSec)); + } + + /** + * Publishes the request and arranges to receive the response. + */ + @Override + protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) { + + final Q request = makeRequest(attempt); + final List<String> expectedKeyValues = getExpectedKeyValues(attempt, request); + + final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + final CompletableFuture<Triple<CommInfrastructure, String, StandardCoderObject>> future = + new CompletableFuture<>(); + final Executor executor = params.getExecutor(); + + // register a listener BEFORE publishing + + // @formatter:off + TriConsumer<CommInfrastructure, String, StandardCoderObject> listener = + (infra, rawResponse, scoResponse) -> future.complete(Triple.of(infra, rawResponse, scoResponse)); + // @formatter:on + + // TODO this currently only allows a single matching response + + forwarder.register(expectedKeyValues, listener); + + // ensure listener is unregistered if the controller is canceled + controller.add(() -> forwarder.unregister(expectedKeyValues, listener)); + + // publish the request + try { + publishRequest(request); + } catch (RuntimeException e) { + logger.warn("{}: failed to publish request for {}", getFullName(), params.getRequestId()); + forwarder.unregister(expectedKeyValues, listener); + throw e; + } + + + // once "future" completes, process the response, and then complete the controller + + // @formatter:off + future.thenApplyAsync( + triple -> processResponse(triple.getLeft(), outcome, triple.getMiddle(), triple.getRight()), + executor) + .whenCompleteAsync(controller.delayedComplete(), executor); + // @formatter:on + + return controller; + } + + /** + * Makes the request. + * + * @param attempt operation attempt + * @return a new request + */ + protected abstract Q makeRequest(int attempt); + + /** + * Gets values, expected in the response, that should match the selector keys. + * + * @param attempt operation attempt + * @param request request to be published + * @return a list of the values to be matched by the selector keys + */ + protected abstract List<String> getExpectedKeyValues(int attempt, Q request); + + /** + * Publishes the request. Encodes the request, if it is not already a String. + * + * @param request request to be published + */ + protected void publishRequest(Q request) { + String json; + try { + if (request instanceof String) { + json = request.toString(); + } else { + json = makeCoder().encode(request); + } + } catch (CoderException e) { + throw new IllegalArgumentException("cannot encode request", e); + } + + List<CommInfrastructure> list = topicPair.publish(json); + if (list.isEmpty()) { + throw new IllegalStateException("nothing published"); + } + + logTopicRequest(list, request); + } + + /** + * Processes a response. + * + * @param infra communication infrastructure on which the response was received + * @param outcome outcome to be populated + * @param response raw response to process + * @param scoResponse response, as a {@link StandardCoderObject} + * @return the outcome + */ + protected OperationOutcome processResponse(CommInfrastructure infra, OperationOutcome outcome, String rawResponse, + StandardCoderObject scoResponse) { + + logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId()); + + logTopicResponse(infra, rawResponse); + + S response; + if (responseClass == String.class) { + response = responseClass.cast(rawResponse); + + } else if (responseClass == StandardCoderObject.class) { + response = responseClass.cast(scoResponse); + + } else { + try { + response = makeCoder().decode(rawResponse, responseClass); + } catch (CoderException e) { + logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + throw new IllegalArgumentException("cannot decode response", e); + } + } + + if (!isSuccess(rawResponse, response)) { + logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + return setOutcome(outcome, PolicyResult.FAILURE); + } + + logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId()); + setOutcome(outcome, PolicyResult.SUCCESS); + postProcessResponse(outcome, rawResponse, response); + + return outcome; + } + + /** + * Processes a successful response. + * + * @param outcome outcome to be populated + * @param rawResponse raw response + * @param response decoded response + */ + protected void postProcessResponse(OperationOutcome outcome, String rawResponse, S response) { + // do nothing + } + + /** + * Determines if the response indicates success. + * + * @param rawResponse raw response + * @param response decoded response + * @return {@code true} if the response indicates success, {@code false} otherwise + */ + protected abstract boolean isSuccess(String rawResponse, S response); + + /** + * Logs a TOPIC request. If the request is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param infrastructures list of communication infrastructures on which it was + * published + * @param request request to be logged + */ + protected void logTopicRequest(List<CommInfrastructure> infrastructures, Q request) { + if (infrastructures.isEmpty()) { + return; + } + + String json; + try { + if (request == null) { + json = null; + } else if (request instanceof String) { + json = request.toString(); + } else { + json = makeCoder().encode(request, true); + } + + } catch (CoderException e) { + logger.warn("cannot pretty-print request", e); + json = request.toString(); + } + + for (CommInfrastructure infra : infrastructures) { + logger.info("[OUT|{}|{}|]{}{}", infra, pairParams.getTarget(), NetLoggerUtil.SYSTEM_LS, json); + } + } + + /** + * Logs a TOPIC response. If the response is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param infra communication infrastructure on which the response was received + * @param response response to be logged + */ + protected <T> void logTopicResponse(CommInfrastructure infra, T response) { + String json; + try { + if (response == null) { + json = null; + } else if (response instanceof String) { + json = response.toString(); + } else { + json = makeCoder().encode(response, true); + } + + } catch (CoderException e) { + logger.warn("cannot pretty-print response", e); + json = response.toString(); + } + + logger.info("[IN|{}|{}|]{}{}", infra, pairParams.getSource(), NetLoggerUtil.SYSTEM_LS, json); + } + + // these may be overridden by junit tests + + protected Coder makeCoder() { + return coder; + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java new file mode 100644 index 000000000..8ce013388 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java @@ -0,0 +1,156 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; +import lombok.Getter; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.controlloop.actorserviceprovider.Operation; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; +import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams; +import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder; +import org.onap.policy.controlloop.actorserviceprovider.topic.SelectorKey; +import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair; +import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager; + +/** + * Operator that uses a pair of topics, one for publishing the request, and another for + * receiving the response. Topic operators may share a {@link TopicPair}. + */ +public abstract class TopicPairOperator extends OperatorPartial { + + /** + * Manager from which to get the topic pair. + */ + private final TopicPairManager pairManager; + + /** + * Keys used to extract the fields used to select responses for this operator. + */ + private final List<SelectorKey> selectorKeys; + + /* + * The remaining fields are initialized when configure() is invoked, thus they may + * change. + */ + + /** + * Current parameters. While {@link params} may change, the values contained within it + * will not, thus operations may copy it. + */ + @Getter + private TopicPairParams params; + + /** + * Topic pair associated with the parameters. + */ + @Getter + private TopicPair topicPair; + + /** + * Forwarder associated with the parameters. + */ + @Getter + private Forwarder forwarder; + + + /** + * Constructs the object. + * + * @param actorName name of the actor with which this operator is associated + * @param name operation name + * @param pairManager manager from which to get the topic pair + * @param selectorKeys keys used to extract the fields used to select responses for + * this operator + */ + public TopicPairOperator(String actorName, String name, TopicPairManager pairManager, + List<SelectorKey> selectorKeys) { + super(actorName, name); + this.pairManager = pairManager; + this.selectorKeys = selectorKeys; + } + + @Override + protected void doConfigure(Map<String, Object> parameters) { + params = Util.translate(getFullName(), parameters, TopicPairParams.class); + ValidationResult result = params.validate(getFullName()); + if (!result.isValid()) { + throw new ParameterValidationRuntimeException("invalid parameters", result); + } + + topicPair = pairManager.getTopicPair(params.getSource(), params.getTarget()); + forwarder = topicPair.addForwarder(selectorKeys); + } + + /** + * Makes an operator that will construct operations. + * + * @param <Q> request type + * @param <S> response type + * @param actorName actor name + * @param operation operation name + * @param pairManager manager from which to get the topic pair + * @param operationMaker function to make an operation + * @param keys keys used to extract the fields used to select responses for this + * operator + * @return a new operator + */ + // @formatter:off + public static <Q,S> TopicPairOperator makeOperator(String actorName, String operation, TopicPairManager pairManager, + BiFunction<ControlLoopOperationParams, TopicPairOperator, TopicPairOperation<Q,S>> operationMaker, + SelectorKey... keys) { + // @formatter:off + + return makeOperator(actorName, operation, pairManager, Arrays.asList(keys), operationMaker); + } + + /** + * Makes an operator that will construct operations. + * + * @param <Q> request type + * @param <S> response type + * @param actorName actor name + * @param operation operation name + * @param pairManager manager from which to get the topic pair + * @param keys keys used to extract the fields used to select responses for + * this operator + * @param operationMaker function to make an operation + * @return a new operator + */ + // @formatter:off + public static <Q,S> TopicPairOperator makeOperator(String actorName, String operation, TopicPairManager pairManager, + List<SelectorKey> keys, + BiFunction<ControlLoopOperationParams, TopicPairOperator, TopicPairOperation<Q,S>> operationMaker) { + // @formatter:on + + return new TopicPairOperator(actorName, operation, pairManager, keys) { + @Override + public synchronized Operation buildOperation(ControlLoopOperationParams params) { + return operationMaker.apply(params, this); + } + }; + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java new file mode 100644 index 000000000..42a44ee9c --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java @@ -0,0 +1,93 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.parameters; + +import java.util.Map; +import lombok.Builder; +import lombok.Data; +import org.onap.policy.common.parameters.BeanValidationResult; +import org.onap.policy.common.parameters.BeanValidator; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.common.parameters.annotations.NotBlank; +import org.onap.policy.common.parameters.annotations.NotNull; + +/** + * Parameters used by Actors whose Operators use a pair of Topics, one to publish requests + * and the other to receive responses. + */ +@NotNull +@NotBlank +@Data +@Builder +public class TopicPairActorParams { + + /** + * This contains the default parameters that are used when an operation doesn't + * specify them. Note: each operation to be used must still have an entry in + * {@link #operation}, even if it's empty. Otherwise, the given operation will not be + * started. + */ + private TopicPairParams defaults; + + /** + * Maps an operation name to its individual parameters. + */ + private Map<String, Map<String, Object>> operation; + + + /** + * Validates the parameters. + * + * @param name name of the object containing these parameters + * @return "this" + * @throws IllegalArgumentException if the parameters are invalid + */ + public TopicPairActorParams doValidation(String name) { + ValidationResult result = validate(name); + if (!result.isValid()) { + throw new ParameterValidationRuntimeException("invalid parameters", result); + } + + return this; + } + + /** + * Validates the parameters. + * + * @param resultName name of the result + * + * @return the validation result + */ + public ValidationResult validate(String resultName) { + BeanValidationResult result = new BeanValidator().validateTop(resultName, this); + + if (defaults != null) { + result.addResult(defaults.validate("defaults")); + } + + // @formatter:off + result.validateMap("operation", operation, + (result2, entry) -> result2.validateNotNull(entry.getKey(), entry.getValue())); + // @formatter:on + + return result; + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParams.java index e6ba7298a..33fcf3052 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParams.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParams.java @@ -29,34 +29,34 @@ import org.onap.policy.common.parameters.annotations.NotBlank; import org.onap.policy.common.parameters.annotations.NotNull; /** - * Parameters used by Operators that connect to a server via DMaaP. + * Parameters used by Operators that use a pair of Topics, one to publish requests and the + * other to receive responses. */ @NotNull @NotBlank @Data @Builder(toBuilder = true) -public class TopicParams { +public class TopicPairParams { /** - * Name of the target topic end point to which requests should be published. + * Source topic end point, from which to read responses. */ - private String target; + private String source; /** - * Source topic end point, from which to read responses. + * Name of the target topic end point to which requests should be published. */ - private String source; + private String target; /** - * Amount of time, in seconds to wait for the response, where zero indicates that it - * should wait forever. The default is zero. + * Amount of time, in seconds to wait for the response. The default is five minutes. */ - @Min(0) + @Min(1) @Builder.Default - private int timeoutSec = 0; + private int timeoutSec = 300; /** - * Validates both the publisher and the subscriber parameters. + * Validates the parameters. * * @param resultName name of the result * diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java new file mode 100644 index 000000000..8e9109c9e --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java @@ -0,0 +1,141 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.topic; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Forwarder that selectively forwards message to listeners based on the content of the + * message. Each forwarder is associated with a single set of selector keys. Listeners are + * then registered with that forwarder for a particular set of values for the given keys. + */ +public class Forwarder { + private static final Logger logger = LoggerFactory.getLogger(Forwarder.class); + + /** + * Maps a set of field values to one or more listeners. + */ + // @formatter:off + private final Map<List<String>, Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String>> + values2listeners = new ConcurrentHashMap<>(); + // @formatter:on + + /** + * Keys used to extract the field values from the {@link StandardCoderObject}. + */ + private final List<SelectorKey> keys; + + /** + * Constructs the object. + * + * @param keys keys used to extract the field's value from the + * {@link StandardCoderObject} + */ + public Forwarder(List<SelectorKey> keys) { + this.keys = keys; + } + + /** + * Registers a listener for messages containing the given field values. + * + * @param values field values of interest, in one-to-one correspondence with the keys + * @param listener listener to register + */ + public void register(List<String> values, TriConsumer<CommInfrastructure, String, StandardCoderObject> listener) { + if (keys.size() != values.size()) { + throw new IllegalArgumentException("key/value mismatch"); + } + + values2listeners.compute(values, (key, listeners) -> { + Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String> map = listeners; + if (map == null) { + map = new ConcurrentHashMap<>(); + } + + map.put(listener, ""); + return map; + }); + } + + /** + * Unregisters a listener for messages containing the given field values. + * + * @param values field values of interest, in one-to-one correspondence with the keys + * @param listener listener to unregister + */ + public void unregister(List<String> values, TriConsumer<CommInfrastructure, String, StandardCoderObject> listener) { + values2listeners.computeIfPresent(values, (key, listeners) -> { + listeners.remove(listener); + return (listeners.isEmpty() ? null : listeners); + }); + } + + /** + * Processes a message, forwarding it to the appropriate listeners, if any. + * + * @param infra communication infrastructure on which the response was received + * @param textMessage original text message that was received + * @param scoMessage decoded text message + */ + public void onMessage(CommInfrastructure infra, String textMessage, StandardCoderObject scoMessage) { + // extract the key values from the message + List<String> values = new ArrayList<>(keys.size()); + for (SelectorKey key : keys) { + String value = key.extractField(scoMessage); + if (value == null) { + /* + * No value for this field, so this message is not relevant to this + * forwarder. + */ + return; + } + + values.add(value); + } + + // get the listeners for this set of values + Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String> listeners = + values2listeners.get(values); + if (listeners == null) { + // no listeners for this particular list of values + return; + } + + + // forward the message to each listener + for (TriConsumer<CommInfrastructure, String, StandardCoderObject> listener : listeners.keySet()) { + try { + listener.accept(infra, textMessage, scoMessage); + } catch (RuntimeException e) { + logger.warn("exception thrown by listener {}", Util.ident(listener), e); + } + } + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/SelectorKey.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/SelectorKey.java new file mode 100644 index 000000000..fc5727395 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/SelectorKey.java @@ -0,0 +1,57 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.topic; + +import lombok.EqualsAndHashCode; +import org.onap.policy.common.utils.coder.StandardCoderObject; + +/** + * Selector key, which contains a hierarchical list of Strings and Integers that are used + * to extract the content of a field, typically from a {@link StandardCoderObject}. + */ +@EqualsAndHashCode +public class SelectorKey { + + /** + * Names and indices used to extract the field's value. + */ + private final Object[] fieldIdentifiers; + + /** + * Constructs the object. + * + * @param fieldIdentifiers names and indices used to extract the field's value + */ + public SelectorKey(Object... fieldIdentifiers) { + this.fieldIdentifiers = fieldIdentifiers; + } + + /** + * Extracts the given field from an object. + * + * @param object object from which to extract the field + * @return the extracted value, or {@code null} if the object does not contain the + * field + */ + public String extractField(StandardCoderObject object) { + return object.getString(fieldIdentifiers); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java new file mode 100644 index 000000000..eb805ca5d --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java @@ -0,0 +1,104 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.topic; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.TopicListener; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A topic listener. When a message arrives on a topic, it is forwarded to listeners based + * on the content of fields found within the message. However, depending on the message + * type, the relevant fields might be found in different places within the message's + * object hierarchy. For each different list of keys, this class maintains a + * {@link Forwarder}, which is used to forward the message to all relevant listeners. + * <p/> + * Once a selector has been added, it is not removed until {@link #shutdown()} is invoked. + * As selectors are typically only added by Operators, and not by individual Operations, + * this should not pose a problem. + */ +public class TopicListenerImpl implements TopicListener { + private static final Logger logger = LoggerFactory.getLogger(TopicListenerImpl.class); + private static StandardCoder coder = new StandardCoder(); + + /** + * Maps selector to a forwarder. + */ + private final Map<List<SelectorKey>, Forwarder> selector2forwarder = new ConcurrentHashMap<>(); + + + /** + * Removes all forwarders. + */ + public void shutdown() { + selector2forwarder.clear(); + } + + /** + * Adds a forwarder, if it doesn't already exist. + * + * @param keys the selector keys + * @return the forwarder associated with the given selector keys + */ + public Forwarder addForwarder(SelectorKey... keys) { + return addForwarder(Arrays.asList(keys)); + } + + /** + * Adds a forwarder, if it doesn't already exist. + * + * @param keys the selector keys + * @return the forwarder associated with the given selector keys + */ + public Forwarder addForwarder(List<SelectorKey> keys) { + return selector2forwarder.computeIfAbsent(keys, key -> new Forwarder(keys)); + } + + /** + * Decodes the message and then forwards it to each forwarder for processing. + */ + @Override + public void onTopicEvent(CommInfrastructure infra, String topic, String message) { + StandardCoderObject object; + try { + object = coder.decode(message, StandardCoderObject.class); + } catch (CoderException e) { + logger.warn("cannot decode message", e); + return; + } + + /* + * We don't know which selector is appropriate for the message, so we just let + * them all take a crack at it. + */ + for (Forwarder forwarder : selector2forwarder.values()) { + forwarder.onMessage(infra, message, object); + } + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java new file mode 100644 index 000000000..c0cfe2571 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java @@ -0,0 +1,122 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.topic; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import lombok.Getter; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; +import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A pair of topics, one of which is used to publish requests and the other to receive + * responses. + */ +public class TopicPair extends TopicListenerImpl { + private static final Logger logger = LoggerFactory.getLogger(TopicPair.class); + + @Getter + private final String source; + + @Getter + private final String target; + + private final List<TopicSink> publishers; + private final List<TopicSource> subscribers; + + /** + * Constructs the object. + * + * @param source source topic name + * @param target target topic name + */ + public TopicPair(String source, String target) { + this.source = source; + this.target = target; + + publishers = getTopicEndpointManager().getTopicSinks(target); + if (publishers.isEmpty()) { + throw new IllegalArgumentException("no sinks for topic: " + target); + } + + subscribers = getTopicEndpointManager().getTopicSources(Arrays.asList(source)); + if (subscribers.isEmpty()) { + throw new IllegalArgumentException("no sources for topic: " + source); + } + } + + /** + * Starts listening on the source topic(s). + */ + public void start() { + subscribers.forEach(topic -> topic.register(this)); + } + + /** + * Stops listening on the source topic(s). + */ + public void stop() { + subscribers.forEach(topic -> topic.unregister(this)); + } + + /** + * Stops listening on the source topic(s) and clears all of the forwarders. + */ + @Override + public void shutdown() { + stop(); + super.shutdown(); + } + + /** + * Publishes a message to the target topic. + * + * @param message message to be published + * @return a list of the infrastructures on which it was published + */ + public List<CommInfrastructure> publish(String message) { + List<CommInfrastructure> infrastructures = new ArrayList<>(publishers.size()); + + for (TopicSink topic : publishers) { + try { + topic.send(message); + infrastructures.add(topic.getTopicCommInfrastructure()); + + } catch (RuntimeException e) { + logger.warn("cannot publish to {}:{}", topic.getTopicCommInfrastructure(), target, e); + } + } + + return infrastructures; + } + + // these may be overridden by junit tests + + protected TopicEndpoint getTopicEndpointManager() { + return TopicEndpointManager.getManager(); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java new file mode 100644 index 000000000..c351f95f6 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java @@ -0,0 +1,37 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.topic; + +/** + * Manages topic pairs. + */ +@FunctionalInterface +public interface TopicPairManager { + + /** + * Gets the topic pair for the given parameters, creating one if it does not exist. + * + * @param source source topic name + * @param target target topic name + * @return the topic pair associated with the given source and target topics + */ + TopicPair getTopicPair(String source, String target); +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperationTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperationTest.java index 19f781d61..39d6fd431 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperationTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperationTest.java @@ -22,6 +22,7 @@ package org.onap.policy.controlloop.actorserviceprovider.impl; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -359,8 +360,7 @@ public class HttpOperationTest { public void testProcessResponseDecodeExcept() throws CoderException { MyGetOperation<Integer> oper2 = new MyGetOperation<>(Integer.class); - assertSame(outcome, oper2.processResponse(outcome, PATH, response)); - assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult()); + assertThatIllegalArgumentException().isThrownBy(() -> oper2.processResponse(outcome, PATH, response)); } @Test diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/MyExec.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/MyExec.java new file mode 100644 index 000000000..6515eb37c --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/MyExec.java @@ -0,0 +1,65 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.Executor; + +/** + * Executor that will run tasks until the queue is empty or a maximum number of tasks have + * been executed. Doesn't actually run anything until {@link #runAll()} is invoked. + */ +public class MyExec implements Executor { + + // TODO move this to policy-common/utils-test + + private final int maxTasks; + private final Queue<Runnable> commands = new LinkedList<>(); + + public MyExec(int maxTasks) { + this.maxTasks = maxTasks; + } + + public int getQueueLength() { + return commands.size(); + } + + @Override + public void execute(Runnable command) { + commands.add(command); + } + + /** + * Runs all tasks until the queue is empty or the maximum number of tasks have been + * reached. + * + * @return {@code true} if the queue is empty, {@code false} if the maximum number of + * tasks have been reached before the queue was completed + */ + public boolean runAll() { + for (int count = 0; count < maxTasks && !commands.isEmpty(); ++count) { + commands.remove().run(); + } + + return commands.isEmpty(); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartialTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartialTest.java index 0d5cb2444..f28c1f6c6 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartialTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartialTest.java @@ -36,7 +36,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Queue; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -105,7 +104,7 @@ public class OperationPartialTest { event.setRequestId(REQ_ID); context = new ControlLoopEventContext(event); - executor = new MyExec(); + executor = new MyExec(100 * MAX_PARALLEL_REQUESTS); params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context) .executor(executor).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT) @@ -1267,36 +1266,4 @@ public class OperationPartialTest { return 0L; } } - - /** - * Executor that will run tasks until the queue is empty or a maximum number of tasks - * have been executed. Doesn't actually run anything until {@link #runAll()} is - * invoked. - */ - private static class MyExec implements Executor { - private static final int MAX_TASKS = MAX_PARALLEL_REQUESTS * 100; - - private Queue<Runnable> commands = new LinkedList<>(); - - public MyExec() { - // do nothing - } - - public int getQueueLength() { - return commands.size(); - } - - @Override - public void execute(Runnable command) { - commands.add(command); - } - - public boolean runAll() { - for (int count = 0; count < MAX_TASKS && !commands.isEmpty(); ++count) { - commands.remove().run(); - } - - return commands.isEmpty(); - } - } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperationTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperationTest.java new file mode 100644 index 000000000..4e45b1abe --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperationTest.java @@ -0,0 +1,503 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.assertj.core.api.Assertions.assertThatIllegalStateException; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import ch.qos.logback.classic.Logger; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.Getter; +import lombok.Setter; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.onap.policy.common.utils.test.log.logback.ExtractAppender; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams; +import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder; +import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair; +import org.onap.policy.controlloop.policy.PolicyResult; +import org.slf4j.LoggerFactory; + +public class TopicPairOperationTest { + private static final List<CommInfrastructure> INFRA_LIST = + Arrays.asList(CommInfrastructure.NOOP, CommInfrastructure.UEB); + private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception"); + private static final String ACTOR = "my-actor"; + private static final String OPERATION = "my-operation"; + private static final String REQ_ID = "my-request-id"; + private static final String MY_SOURCE = "my-source"; + private static final String MY_TARGET = "my-target"; + private static final String TEXT = "some text"; + private static final int TIMEOUT_SEC = 10; + private static final long TIMEOUT_MS = 1000 * TIMEOUT_SEC; + + private static final StandardCoder coder = new StandardCoder(); + + /** + * Used to attach an appender to the class' logger. + */ + private static final Logger logger = (Logger) LoggerFactory.getLogger(TopicPairOperation.class); + private static final ExtractAppender appender = new ExtractAppender(); + + @Mock + private TopicPairOperator operator; + @Mock + private TopicPair pair; + @Mock + private Forwarder forwarder; + + @Captor + private ArgumentCaptor<TriConsumer<CommInfrastructure, String, StandardCoderObject>> listenerCaptor; + + private ControlLoopOperationParams params; + private TopicPairParams topicParams; + private OperationOutcome outcome; + private StandardCoderObject stdResponse; + private String responseText; + private MyExec executor; + private TopicPairOperation<MyRequest, MyResponse> oper; + + /** + * Attaches the appender to the logger. + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + /** + * Attach appender to the logger. + */ + appender.setContext(logger.getLoggerContext()); + appender.start(); + + logger.addAppender(appender); + } + + /** + * Stops the appender. + */ + @AfterClass + public static void tearDownAfterClass() { + appender.stop(); + } + + /** + * Sets up. + */ + @Before + public void setUp() throws CoderException { + MockitoAnnotations.initMocks(this); + + appender.clearExtractions(); + + topicParams = TopicPairParams.builder().source(MY_SOURCE).target(MY_TARGET).timeoutSec(TIMEOUT_SEC).build(); + + when(operator.getActorName()).thenReturn(ACTOR); + when(operator.getName()).thenReturn(OPERATION); + when(operator.getTopicPair()).thenReturn(pair); + when(operator.getForwarder()).thenReturn(forwarder); + when(operator.getParams()).thenReturn(topicParams); + when(operator.isAlive()).thenReturn(true); + + when(pair.publish(any())).thenReturn(INFRA_LIST); + + executor = new MyExec(100); + + params = ControlLoopOperationParams.builder().actor(ACTOR).operation(OPERATION).executor(executor).build(); + outcome = params.makeOutcome(); + + responseText = coder.encode(new MyResponse()); + stdResponse = coder.decode(responseText, StandardCoderObject.class); + + oper = new MyOperation(); + } + + @Test + public void testTopicPairOperation_testGetTopicPair_testGetForwarder_testGetPairParams() { + assertEquals(ACTOR, oper.getActorName()); + assertEquals(OPERATION, oper.getName()); + assertSame(pair, oper.getTopicPair()); + assertSame(forwarder, oper.getForwarder()); + assertSame(topicParams, oper.getPairParams()); + assertEquals(TIMEOUT_MS, oper.getTimeoutMs()); + assertSame(MyResponse.class, oper.getResponseClass()); + } + + @Test + public void testStartOperationAsync() throws Exception { + CompletableFuture<OperationOutcome> future = oper.startOperationAsync(1, outcome); + assertFalse(future.isDone()); + + verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture()); + + verify(forwarder, never()).unregister(any(), any()); + + verify(pair).publish(any()); + + // provide the response + listenerCaptor.getValue().accept(CommInfrastructure.NOOP, responseText, stdResponse); + + // run the tasks + assertTrue(executor.runAll()); + + assertTrue(future.isDone()); + + assertSame(outcome, future.get(5, TimeUnit.SECONDS)); + assertEquals(PolicyResult.SUCCESS, outcome.getResult()); + + verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue())); + } + + /** + * Tests startOperationAsync() when the publisher throws an exception. + */ + @Test + public void testStartOperationAsyncException() throws Exception { + // indicate that nothing was published + when(pair.publish(any())).thenReturn(Arrays.asList()); + + assertThatIllegalStateException().isThrownBy(() -> oper.startOperationAsync(1, outcome)); + + verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture()); + + // must still unregister + verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue())); + } + + @Test + public void testGetTimeoutMsInteger() { + // use default + assertEquals(TIMEOUT_MS, oper.getTimeoutMs(null)); + assertEquals(TIMEOUT_MS, oper.getTimeoutMs(0)); + + // use provided value + assertEquals(5000, oper.getTimeoutMs(5)); + } + + @Test + public void testPublishRequest() { + oper.publishRequest(new MyRequest()); + assertEquals(INFRA_LIST.size(), appender.getExtracted().size()); + } + + /** + * Tests publishRequest() when nothing is published. + */ + @Test + public void testPublishRequestUnpublished() { + when(pair.publish(any())).thenReturn(Arrays.asList()); + assertThatIllegalStateException().isThrownBy(() -> oper.publishRequest(new MyRequest())); + } + + /** + * Tests publishRequest() when the request type is a String. + */ + @Test + public void testPublishRequestString() { + MyStringOperation oper2 = new MyStringOperation(); + oper2.publishRequest(TEXT); + assertEquals(INFRA_LIST.size(), appender.getExtracted().size()); + } + + /** + * Tests publishRequest() when the coder throws an exception. + */ + @Test + public void testPublishRequestException() { + setOperCoderException(); + assertThatIllegalArgumentException().isThrownBy(() -> oper.publishRequest(new MyRequest())); + } + + /** + * Tests processResponse() when it's a success and the response type is a String. + */ + @Test + public void testProcessResponseSuccessString() { + MyStringOperation oper2 = new MyStringOperation(); + + assertSame(outcome, oper2.processResponse(CommInfrastructure.NOOP, outcome, TEXT, null)); + assertEquals(PolicyResult.SUCCESS, outcome.getResult()); + } + + /** + * Tests processResponse() when it's a success and the response type is a + * StandardCoderObject. + */ + @Test + public void testProcessResponseSuccessSco() { + MyScoOperation oper2 = new MyScoOperation(); + + assertSame(outcome, oper2.processResponse(CommInfrastructure.NOOP, outcome, responseText, stdResponse)); + assertEquals(PolicyResult.SUCCESS, outcome.getResult()); + } + + /** + * Tests processResponse() when it's a failure. + */ + @Test + public void testProcessResponseFailure() throws CoderException { + // indicate error in the response + MyResponse resp = new MyResponse(); + resp.setOutput("error"); + + responseText = coder.encode(resp); + stdResponse = coder.decode(responseText, StandardCoderObject.class); + + assertSame(outcome, oper.processResponse(CommInfrastructure.NOOP, outcome, responseText, stdResponse)); + assertEquals(PolicyResult.FAILURE, outcome.getResult()); + } + + /** + * Tests processResponse() when the decoder succeeds. + */ + @Test + public void testProcessResponseDecodeOk() throws CoderException { + assertSame(outcome, oper.processResponse(CommInfrastructure.NOOP, outcome, responseText, stdResponse)); + assertEquals(PolicyResult.SUCCESS, outcome.getResult()); + } + + /** + * Tests processResponse() when the decoder throws an exception. + */ + @Test + public void testProcessResponseDecodeExcept() throws CoderException { + // @formatter:off + assertThatIllegalArgumentException().isThrownBy( + () -> oper.processResponse(CommInfrastructure.NOOP, outcome, "{invalid json", stdResponse)); + // @formatter:on + } + + @Test + public void testPostProcessResponse() { + assertThatCode(() -> oper.postProcessResponse(outcome, null, null)).doesNotThrowAnyException(); + } + + @Test + public void testLogTopicRequest() { + // nothing to log + appender.clearExtractions(); + oper.logTopicRequest(Arrays.asList(), new MyRequest()); + assertEquals(0, appender.getExtracted().size()); + + // log structured data + appender.clearExtractions(); + oper.logTopicRequest(INFRA_LIST, new MyRequest()); + List<String> output = appender.getExtracted(); + assertEquals(2, output.size()); + + assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()) + .contains("{\n \"theRequestId\": \"my-request-id\"\n}"); + + assertThat(output.get(1)).contains(CommInfrastructure.UEB.toString()) + .contains("{\n \"theRequestId\": \"my-request-id\"\n}"); + + // log a plain string + appender.clearExtractions(); + new MyStringOperation().logTopicRequest(Arrays.asList(CommInfrastructure.NOOP), TEXT); + output = appender.getExtracted(); + assertEquals(1, output.size()); + assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains(TEXT); + + // log a null request + appender.clearExtractions(); + oper.logTopicRequest(Arrays.asList(CommInfrastructure.NOOP), null); + output = appender.getExtracted(); + assertEquals(1, output.size()); + + assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains("null"); + + // exception from coder + setOperCoderException(); + + appender.clearExtractions(); + oper.logTopicRequest(Arrays.asList(CommInfrastructure.NOOP), new MyRequest()); + output = appender.getExtracted(); + assertEquals(2, output.size()); + assertThat(output.get(0)).contains("cannot pretty-print request"); + assertThat(output.get(1)).contains(CommInfrastructure.NOOP.toString()); + } + + @Test + public void testLogTopicResponse() { + // log structured data + appender.clearExtractions(); + oper.logTopicResponse(CommInfrastructure.NOOP, new MyResponse()); + List<String> output = appender.getExtracted(); + assertEquals(1, output.size()); + + assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()) + .contains("{\n \"requestId\": \"my-request-id\"\n}"); + + // log a plain string + appender.clearExtractions(); + new MyStringOperation().logTopicResponse(CommInfrastructure.NOOP, TEXT); + output = appender.getExtracted(); + assertEquals(1, output.size()); + assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains(TEXT); + + // log a null response + appender.clearExtractions(); + oper.logTopicResponse(CommInfrastructure.NOOP, null); + output = appender.getExtracted(); + assertEquals(1, output.size()); + + assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains("null"); + + // exception from coder + setOperCoderException(); + + appender.clearExtractions(); + oper.logTopicResponse(CommInfrastructure.NOOP, new MyResponse()); + output = appender.getExtracted(); + assertEquals(2, output.size()); + assertThat(output.get(0)).contains("cannot pretty-print response"); + assertThat(output.get(1)).contains(CommInfrastructure.NOOP.toString()); + } + + @Test + public void testMakeCoder() { + assertNotNull(oper.makeCoder()); + } + + /** + * Creates a new {@link #oper} whose coder will throw an exception. + */ + private void setOperCoderException() { + oper = new MyOperation() { + @Override + protected Coder makeCoder() { + return new StandardCoder() { + @Override + public String encode(Object object, boolean pretty) throws CoderException { + throw new CoderException(EXPECTED_EXCEPTION); + } + }; + } + }; + } + + @Getter + @Setter + public static class MyRequest { + private String theRequestId = REQ_ID; + private String input; + } + + @Getter + @Setter + public static class MyResponse { + private String requestId = REQ_ID; + private String output; + } + + + private class MyStringOperation extends TopicPairOperation<String, String> { + public MyStringOperation() { + super(TopicPairOperationTest.this.params, operator, String.class); + } + + @Override + protected String makeRequest(int attempt) { + return TEXT; + } + + @Override + protected List<String> getExpectedKeyValues(int attempt, String request) { + return Arrays.asList(REQ_ID); + } + + @Override + protected boolean isSuccess(String rawResponse, String response) { + return (response != null); + } + } + + + private class MyScoOperation extends TopicPairOperation<MyRequest, StandardCoderObject> { + public MyScoOperation() { + super(TopicPairOperationTest.this.params, operator, StandardCoderObject.class); + } + + @Override + protected MyRequest makeRequest(int attempt) { + return new MyRequest(); + } + + @Override + protected List<String> getExpectedKeyValues(int attempt, MyRequest request) { + return Arrays.asList(REQ_ID); + } + + @Override + protected boolean isSuccess(String rawResponse, StandardCoderObject response) { + return (response.getString("output") == null); + } + } + + + private class MyOperation extends TopicPairOperation<MyRequest, MyResponse> { + public MyOperation() { + super(TopicPairOperationTest.this.params, operator, MyResponse.class); + } + + @Override + protected MyRequest makeRequest(int attempt) { + return new MyRequest(); + } + + @Override + protected List<String> getExpectedKeyValues(int attempt, MyRequest request) { + return Arrays.asList(REQ_ID); + } + + @Override + protected boolean isSuccess(String rawResponse, MyResponse response) { + return (response.getOutput() == null); + } + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperatorTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperatorTest.java new file mode 100644 index 000000000..dd25902d6 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperatorTest.java @@ -0,0 +1,140 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.policy.controlloop.actorserviceprovider.Operation; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; +import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams; +import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder; +import org.onap.policy.controlloop.actorserviceprovider.topic.SelectorKey; +import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair; +import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager; + +public class TopicPairOperatorTest { + private static final String ACTOR = "my-actor"; + private static final String OPERATION = "my-operation"; + private static final String MY_SOURCE = "my-source"; + private static final String MY_TARGET = "my-target"; + private static final int TIMEOUT_SEC = 10; + + @Mock + private TopicPairManager mgr; + @Mock + private TopicPair pair; + @Mock + private Forwarder forwarder; + @Mock + private TopicPairOperation<String, Integer> operation; + + private List<SelectorKey> keys; + private TopicPairParams params; + private MyOperator oper; + + /** + * Sets up. + */ + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + keys = List.of(new SelectorKey("")); + + when(mgr.getTopicPair(MY_SOURCE, MY_TARGET)).thenReturn(pair); + when(pair.addForwarder(keys)).thenReturn(forwarder); + + oper = new MyOperator(keys); + + params = TopicPairParams.builder().source(MY_SOURCE).target(MY_TARGET).timeoutSec(TIMEOUT_SEC).build(); + oper.configure(Util.translateToMap(OPERATION, params)); + oper.start(); + } + + @Test + public void testTopicPairOperator_testGetParams_testGetTopicPair_testGetForwarder() { + assertEquals(ACTOR, oper.getActorName()); + assertEquals(OPERATION, oper.getName()); + assertEquals(params, oper.getParams()); + assertSame(pair, oper.getTopicPair()); + assertSame(forwarder, oper.getForwarder()); + } + + @Test + public void testDoConfigure() { + oper.stop(); + + // invalid parameters + params.setSource(null); + assertThatThrownBy(() -> oper.configure(Util.translateToMap(OPERATION, params))) + .isInstanceOf(ParameterValidationRuntimeException.class); + } + + @Test + public void testMakeOperator() { + AtomicReference<ControlLoopOperationParams> paramsRef = new AtomicReference<>(); + AtomicReference<TopicPairOperator> operRef = new AtomicReference<>(); + + // @formatter:off + BiFunction<ControlLoopOperationParams, TopicPairOperator, TopicPairOperation<String, Integer>> maker = + (params, operator) -> { + paramsRef.set(params); + operRef.set(operator); + return operation; + }; + // @formatter:on + + TopicPairOperator oper2 = TopicPairOperator.makeOperator(ACTOR, OPERATION, mgr, maker, new SelectorKey("")); + + assertEquals(ACTOR, oper2.getActorName()); + assertEquals(OPERATION, oper2.getName()); + + ControlLoopOperationParams params2 = ControlLoopOperationParams.builder().build(); + + assertSame(operation, oper2.buildOperation(params2)); + assertSame(params2, paramsRef.get()); + assertSame(oper2, operRef.get()); + } + + + private class MyOperator extends TopicPairOperator { + public MyOperator(List<SelectorKey> selectorKeys) { + super(ACTOR, OPERATION, mgr, selectorKeys); + } + + @Override + public Operation buildOperation(ControlLoopOperationParams params) { + return null; + } + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParamsTest.java new file mode 100644 index 000000000..4322c5f39 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParamsTest.java @@ -0,0 +1,132 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.parameters; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import java.util.Map; +import java.util.TreeMap; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.controlloop.actorserviceprovider.Util; + +public class TopicPairActorParamsTest { + private static final String MY_NAME = "my-name"; + + private static final String DFLT_SOURCE = "default-source"; + private static final String DFLT_TARGET = "default-target"; + private static final int DFLT_TIMEOUT = 10; + + private static final String OPER1_NAME = "oper A"; + private static final String OPER1_SOURCE = "source A"; + private static final String OPER1_TARGET = "target A"; + private static final int OPER1_TIMEOUT = 20; + + // oper2 uses some default values + private static final String OPER2_NAME = "oper B"; + private static final String OPER2_SOURCE = "source B"; + + // oper3 uses default values for everything + private static final String OPER3_NAME = "oper C"; + + private TopicPairParams defaults; + private Map<String, Map<String, Object>> operMap; + private TopicPairActorParams params; + + + /** + * Sets up. + */ + @Before + public void setUp() { + defaults = TopicPairParams.builder().source(DFLT_SOURCE).target(DFLT_TARGET).timeoutSec(DFLT_TIMEOUT).build(); + + TopicPairParams oper1 = TopicPairParams.builder().source(OPER1_SOURCE).target(OPER1_TARGET) + .timeoutSec(OPER1_TIMEOUT).build(); + + Map<String, Object> oper1Map = Util.translateToMap(OPER1_NAME, oper1); + Map<String, Object> oper2Map = Map.of("source", OPER2_SOURCE); + Map<String, Object> oper3Map = Collections.emptyMap(); + operMap = Map.of(OPER1_NAME, oper1Map, OPER2_NAME, oper2Map, OPER3_NAME, oper3Map); + + params = TopicPairActorParams.builder().defaults(defaults).operation(operMap).build(); + + } + + @Test + public void testTopicPairActorParams() { + assertSame(defaults, params.getDefaults()); + assertSame(operMap, params.getOperation()); + } + + @Test + public void testDoValidation() { + assertSame(params, params.doValidation(MY_NAME)); + + // test with invalid parameters + defaults.setTimeoutSec(-1); + assertThatThrownBy(() -> params.doValidation(MY_NAME)).isInstanceOf(ParameterValidationRuntimeException.class); + } + + @Test + public void testValidate() { + ValidationResult result; + + // null defaults + params.setDefaults(null); + result = params.validate(MY_NAME); + assertFalse(result.isValid()); + assertThat(result.getResult()).contains("defaults").contains("null"); + params.setDefaults(defaults); + + // invalid value in defaults + defaults.setTimeoutSec(-1); + result = params.validate(MY_NAME); + assertFalse(result.isValid()); + assertThat(result.getResult()).contains("defaults").contains("timeoutSec"); + defaults.setTimeoutSec(DFLT_TIMEOUT); + + // null map + params.setOperation(null); + result = params.validate(MY_NAME); + assertFalse(result.isValid()); + assertThat(result.getResult()).contains("operation"); + params.setOperation(operMap); + + // null entry in the map + Map<String, Map<String, Object>> map2 = new TreeMap<>(operMap); + map2.put(OPER2_NAME, null); + params.setOperation(map2); + result = params.validate(MY_NAME); + assertFalse(result.isValid()); + assertThat(result.getResult()).contains("operation").contains("null"); + params.setOperation(operMap); + + // test success case + assertTrue(params.validate(MY_NAME).isValid()); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParamsTest.java index 77c9af362..d63c833d1 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicParamsTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParamsTest.java @@ -29,20 +29,20 @@ import java.util.function.Function; import org.junit.Before; import org.junit.Test; import org.onap.policy.common.parameters.ValidationResult; -import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicParams.TopicParamsBuilder; +import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams.TopicPairParamsBuilder; -public class TopicParamsTest { +public class TopicPairParamsTest { private static final String CONTAINER = "my-container"; private static final String TARGET = "my-target"; private static final String SOURCE = "my-source"; private static final int TIMEOUT = 10; - private TopicParams params; + private TopicPairParams params; @Before public void setUp() { - params = TopicParams.builder().target(TARGET).source(SOURCE).timeoutSec(TIMEOUT).build(); + params = TopicPairParams.builder().target(TARGET).source(SOURCE).timeoutSec(TIMEOUT).build(); } @Test @@ -52,8 +52,11 @@ public class TopicParamsTest { testValidateField("timeoutSec", "minimum", bldr -> bldr.timeoutSec(-1)); // check edge cases - assertTrue(params.toBuilder().timeoutSec(0).build().validate(CONTAINER).isValid()); + assertFalse(params.toBuilder().timeoutSec(0).build().validate(CONTAINER).isValid()); assertTrue(params.toBuilder().timeoutSec(1).build().validate(CONTAINER).isValid()); + + // some default values should be valid + assertTrue(TopicPairParams.builder().target(TARGET).source(SOURCE).build().validate(CONTAINER).isValid()); } @Test @@ -66,7 +69,7 @@ public class TopicParamsTest { } private void testValidateField(String fieldName, String expected, - Function<TopicParamsBuilder, TopicParamsBuilder> makeInvalid) { + Function<TopicPairParamsBuilder, TopicPairParamsBuilder> makeInvalid) { // original params should be valid ValidationResult result = params.validate(CONTAINER); diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/ForwarderTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/ForwarderTest.java new file mode 100644 index 000000000..24f8b70a8 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/ForwarderTest.java @@ -0,0 +1,201 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.topic; + +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.Arrays; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.onap.policy.controlloop.actorserviceprovider.Util; + +public class ForwarderTest { + private static final CommInfrastructure INFRA = CommInfrastructure.NOOP; + private static final String TEXT = "some text"; + + private static final String KEY1 = "requestId"; + private static final String KEY2 = "container"; + private static final String SUBKEY = "subRequestId"; + + private static final String VALUEA_REQID = "hello"; + private static final String VALUEA_SUBREQID = "world"; + + // request id is shared with value A + private static final String VALUEB_REQID = "hello"; + private static final String VALUEB_SUBREQID = "another world"; + + // unique values + private static final String VALUEC_REQID = "bye"; + private static final String VALUEC_SUBREQID = "bye-bye"; + + @Mock + private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener1; + + @Mock + private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener1b; + + @Mock + private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener2; + + @Mock + private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener3; + + private Forwarder forwarder; + + + /** + * Sets up. + */ + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + forwarder = new Forwarder(Arrays.asList(new SelectorKey(KEY1), new SelectorKey(KEY2, SUBKEY))); + + forwarder.register(Arrays.asList(VALUEA_REQID, VALUEA_SUBREQID), listener1); + forwarder.register(Arrays.asList(VALUEA_REQID, VALUEA_SUBREQID), listener1b); + forwarder.register(Arrays.asList(VALUEB_REQID, VALUEB_SUBREQID), listener2); + forwarder.register(Arrays.asList(VALUEC_REQID, VALUEC_SUBREQID), listener3); + } + + @Test + public void testRegister() { + // key size mismatches + assertThatIllegalArgumentException().isThrownBy(() -> forwarder.register(Arrays.asList(), listener1)) + .withMessage("key/value mismatch"); + assertThatIllegalArgumentException() + .isThrownBy(() -> forwarder.register(Arrays.asList(VALUEA_REQID), listener1)) + .withMessage("key/value mismatch"); + } + + @Test + public void testUnregister() { + // remove listener1b + forwarder.unregister(Arrays.asList(VALUEA_REQID, VALUEA_SUBREQID), listener1b); + + StandardCoderObject sco = makeMessage(Map.of(KEY1, VALUEA_REQID, KEY2, Map.of(SUBKEY, VALUEA_SUBREQID))); + forwarder.onMessage(INFRA, TEXT, sco); + + verify(listener1).accept(INFRA, TEXT, sco); + verify(listener1b, never()).accept(any(), any(), any()); + + // remove listener1 + forwarder.unregister(Arrays.asList(VALUEA_REQID, VALUEA_SUBREQID), listener1); + forwarder.onMessage(INFRA, TEXT, sco); + + // route a message to listener2 + sco = makeMessage(Map.of(KEY1, VALUEB_REQID, KEY2, Map.of(SUBKEY, VALUEB_SUBREQID))); + forwarder.onMessage(INFRA, TEXT, sco); + verify(listener2).accept(INFRA, TEXT, sco); + + // no more messages to listener1 or 1b + verify(listener1).accept(any(), any(), any()); + verify(listener1b, never()).accept(any(), any(), any()); + } + + @Test + public void testOnMessage() { + StandardCoderObject sco = makeMessage(Map.of(KEY1, VALUEA_REQID, KEY2, Map.of(SUBKEY, VALUEA_SUBREQID))); + forwarder.onMessage(INFRA, TEXT, sco); + + verify(listener1).accept(INFRA, TEXT, sco); + verify(listener1b).accept(INFRA, TEXT, sco); + + // repeat - counts should increment + forwarder.onMessage(INFRA, TEXT, sco); + + verify(listener1, times(2)).accept(INFRA, TEXT, sco); + verify(listener1b, times(2)).accept(INFRA, TEXT, sco); + + // should not have been invoked + verify(listener2, never()).accept(any(), any(), any()); + verify(listener3, never()).accept(any(), any(), any()); + + // try other listeners now + sco = makeMessage(Map.of(KEY1, VALUEB_REQID, KEY2, Map.of(SUBKEY, VALUEB_SUBREQID))); + forwarder.onMessage(INFRA, TEXT, sco); + verify(listener2).accept(INFRA, TEXT, sco); + + sco = makeMessage(Map.of(KEY1, VALUEC_REQID, KEY2, Map.of(SUBKEY, VALUEC_SUBREQID))); + forwarder.onMessage(INFRA, TEXT, sco); + verify(listener3).accept(INFRA, TEXT, sco); + + // message has no listeners + sco = makeMessage(Map.of(KEY1, "xyzzy", KEY2, Map.of(SUBKEY, VALUEB_SUBREQID))); + forwarder.onMessage(INFRA, TEXT, sco); + + // message doesn't have both keys + sco = makeMessage(Map.of(KEY1, VALUEA_REQID)); + forwarder.onMessage(INFRA, TEXT, sco); + + // counts should not have incremented + verify(listener1, times(2)).accept(any(), any(), any()); + verify(listener1b, times(2)).accept(any(), any(), any()); + verify(listener2).accept(any(), any(), any()); + verify(listener3).accept(any(), any(), any()); + + // listener throws an exception + doThrow(new IllegalStateException("expected exception")).when(listener1).accept(any(), any(), any()); + } + + /* + * Tests onMessage() when listener1 throws an exception. + */ + @Test + public void testOnMessageListenerException1() { + doThrow(new IllegalStateException("expected exception")).when(listener1).accept(any(), any(), any()); + + StandardCoderObject sco = makeMessage(Map.of(KEY1, VALUEA_REQID, KEY2, Map.of(SUBKEY, VALUEA_SUBREQID))); + forwarder.onMessage(INFRA, TEXT, sco); + + verify(listener1b).accept(INFRA, TEXT, sco); + } + + /* + * Tests onMessage() when listener1b throws an exception. + */ + @Test + public void testOnMessageListenerException1b() { + doThrow(new IllegalStateException("expected exception")).when(listener1b).accept(any(), any(), any()); + + StandardCoderObject sco = makeMessage(Map.of(KEY1, VALUEA_REQID, KEY2, Map.of(SUBKEY, VALUEA_SUBREQID))); + forwarder.onMessage(INFRA, TEXT, sco); + + verify(listener1).accept(INFRA, TEXT, sco); + } + + /** + * Makes a message from a map. + */ + private StandardCoderObject makeMessage(Map<String, Object> map) { + return Util.translate("", map, StandardCoderObject.class); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/SelectorKeyTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/SelectorKeyTest.java new file mode 100644 index 000000000..19df9c2d8 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/SelectorKeyTest.java @@ -0,0 +1,93 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.topic; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.util.Map; +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.onap.policy.controlloop.actorserviceprovider.Util; + +public class SelectorKeyTest { + private static final String FIELD1 = "map"; + private static final String FIELD2 = "abc"; + private static final String FIELDX = "abd"; + + private SelectorKey key; + + @Before + public void setUp() { + key = new SelectorKey(FIELD1, FIELD2); + } + + @Test + public void testHashCode_testEquals() { + SelectorKey key2 = new SelectorKey(FIELD1, FIELD2); + assertEquals(key, key2); + assertEquals(key.hashCode(), key2.hashCode()); + + key2 = new SelectorKey(FIELD1, FIELDX); + assertNotEquals(key, key2); + assertNotEquals(key.hashCode(), key2.hashCode()); + + // test empty key + key = new SelectorKey(); + key2 = new SelectorKey(); + assertEquals(key, key2); + assertEquals(key.hashCode(), key2.hashCode()); + } + + @Test + public void testExtractField() { + Map<String, Object> map = Map.of("hello", "world", FIELD1, Map.of("another", "", FIELD2, "value B")); + StandardCoderObject sco = Util.translate("", map, StandardCoderObject.class); + + String result = key.extractField(sco); + assertNotNull(result); + assertEquals("value B", result); + + // shorter key + assertEquals("world", new SelectorKey("hello").extractField(sco)); + assertNull(new SelectorKey("bye").extractField(sco)); + + // not found + assertNull(new SelectorKey(FIELD1, "not field 2").extractField(sco)); + + // test with empty key + assertNull(new SelectorKey().extractField(sco)); + } + + @Getter + @Setter + @Builder + protected static class Data { + private String text; + private Map<String, String> map; + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImplTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImplTest.java new file mode 100644 index 000000000..a37085799 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImplTest.java @@ -0,0 +1,154 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.topic; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import java.util.Arrays; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.common.utils.coder.StandardCoderObject; + +public class TopicListenerImplTest { + private static final StandardCoder coder = new StandardCoder(); + private static final CommInfrastructure INFRA = CommInfrastructure.NOOP; + private static final String MY_TOPIC = "my-topic"; + private static final String KEY1 = "requestId"; + private static final String KEY2 = "container"; + private static final String SUBKEY = "subRequestId"; + + private static final String VALUEA_REQID = "hello"; + private static final String VALUEA_SUBREQID = "world"; + + private static final String VALUEB_REQID = "bye"; + + private Forwarder forwarder1; + private Forwarder forwarder2; + private TopicListenerImpl topic; + + @Mock + private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener1; + + @Mock + private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener1b; + + @Mock + private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener2; + + + /** + * Sets up. + */ + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + topic = new TopicListenerImpl(); + + forwarder1 = topic.addForwarder(new SelectorKey(KEY1)); + forwarder2 = topic.addForwarder(new SelectorKey(KEY1), new SelectorKey(KEY2, SUBKEY)); + + assertNotNull(forwarder1); + assertNotNull(forwarder2); + assertNotSame(forwarder1, forwarder2); + + forwarder1.register(Arrays.asList(VALUEA_REQID), listener1); + forwarder1.register(Arrays.asList(VALUEB_REQID), listener1b); + forwarder2.register(Arrays.asList(VALUEA_REQID, VALUEA_SUBREQID), listener2); + } + + @Test + public void testShutdown() { + // shut it down, which should clear all forwarders + topic.shutdown(); + + // should get a new forwarder now + Forwarder forwarder = topic.addForwarder(new SelectorKey(KEY1)); + assertNotSame(forwarder1, forwarder); + assertNotSame(forwarder2, forwarder); + + // new forwarder should be unchanged + assertSame(forwarder, topic.addForwarder(new SelectorKey(KEY1))); + } + + @Test + public void testAddForwarder() { + assertSame(forwarder1, topic.addForwarder(new SelectorKey(KEY1))); + assertSame(forwarder2, topic.addForwarder(new SelectorKey(KEY1), new SelectorKey(KEY2, SUBKEY))); + } + + @Test + public void testOnTopicEvent() { + /* + * send a message that should go to listener1 on forwarder1 and listener2 on + * forwarder2 + */ + String msg = makeMessage(Map.of(KEY1, VALUEA_REQID, KEY2, Map.of(SUBKEY, VALUEA_SUBREQID))); + topic.onTopicEvent(INFRA, MY_TOPIC, msg); + + verify(listener1).accept(eq(INFRA), eq(msg), any()); + verify(listener2).accept(eq(INFRA), eq(msg), any()); + + // not to listener1b + verify(listener1b, never()).accept(any(), any(), any()); + + /* + * now send a message that should only go to listener1b on forwarder1 + */ + msg = makeMessage(Map.of(KEY1, VALUEB_REQID, KEY2, Map.of(SUBKEY, VALUEA_SUBREQID))); + topic.onTopicEvent(INFRA, MY_TOPIC, msg); + + // should route to listener1 on forwarder1 and listener2 on forwarder2 + verify(listener1b).accept(eq(INFRA), eq(msg), any()); + + // try one where the coder throws an exception + topic.onTopicEvent(INFRA, MY_TOPIC, "{invalid-json"); + + // no extra invocations + verify(listener1).accept(any(), any(), any()); + verify(listener1b).accept(any(), any(), any()); + verify(listener2).accept(any(), any(), any()); + } + + /** + * Makes a message from a map. + */ + private String makeMessage(Map<String, Object> map) { + try { + return coder.encode(map); + } catch (CoderException e) { + throw new IllegalArgumentException(e); + } + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairTest.java new file mode 100644 index 000000000..c6557d0c9 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairTest.java @@ -0,0 +1,158 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.topic; + +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.List; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.TopicSource; + +public class TopicPairTest { + private static final String UNKNOWN = "unknown"; + private static final String MY_SOURCE = "pair-source"; + private static final String MY_TARGET = "pair-target"; + private static final String TEXT = "some text"; + + @Mock + private TopicSink publisher1; + + @Mock + private TopicSink publisher2; + + @Mock + private TopicSource subscriber1; + + @Mock + private TopicSource subscriber2; + + @Mock + private TopicEndpoint mgr; + + private TopicPair pair; + + + /** + * Sets up. + */ + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + when(mgr.getTopicSinks(MY_TARGET)).thenReturn(Arrays.asList(publisher1, publisher2)); + when(mgr.getTopicSources(eq(Arrays.asList(MY_SOURCE)))).thenReturn(Arrays.asList(subscriber1, subscriber2)); + + when(publisher1.getTopicCommInfrastructure()).thenReturn(CommInfrastructure.NOOP); + when(publisher2.getTopicCommInfrastructure()).thenReturn(CommInfrastructure.UEB); + + pair = new MyTopicPair(MY_SOURCE, MY_TARGET); + + pair.start(); + } + + @Test + public void testTopicPair_testGetSource_testGetTarget() { + assertEquals(MY_SOURCE, pair.getSource()); + assertEquals(MY_TARGET, pair.getTarget()); + + verify(mgr).getTopicSinks(anyString()); + verify(mgr).getTopicSources(any()); + + // source not found + assertThatIllegalArgumentException().isThrownBy(() -> new MyTopicPair(UNKNOWN, MY_TARGET)) + .withMessageContaining("sources").withMessageContaining(UNKNOWN); + + // target not found + assertThatIllegalArgumentException().isThrownBy(() -> new MyTopicPair(MY_SOURCE, UNKNOWN)) + .withMessageContaining("sinks").withMessageContaining(UNKNOWN); + } + + @Test + public void testShutdown() { + pair.shutdown(); + verify(subscriber1).unregister(pair); + verify(subscriber2).unregister(pair); + } + + @Test + public void testStart() { + verify(subscriber1).register(pair); + verify(subscriber2).register(pair); + } + + @Test + public void testStop() { + pair.stop(); + verify(subscriber1).unregister(pair); + verify(subscriber2).unregister(pair); + } + + @Test + public void testPublish() { + List<CommInfrastructure> infrastructures = pair.publish(TEXT); + assertEquals(Arrays.asList(CommInfrastructure.NOOP, CommInfrastructure.UEB), infrastructures); + + verify(publisher1).send(TEXT); + verify(publisher2).send(TEXT); + + // first one throws an exception - should have only published to the second + when(publisher1.send(any())).thenThrow(new IllegalStateException("expected exception")); + + infrastructures = pair.publish(TEXT); + assertEquals(Arrays.asList(CommInfrastructure.UEB), infrastructures); + + verify(publisher2, times(2)).send(TEXT); + } + + @Test + public void testGetTopicEndpointManager() { + // setting "mgr" to null should cause it to use the superclass' method + mgr = null; + assertNotNull(pair.getTopicEndpointManager()); + } + + + private class MyTopicPair extends TopicPair { + public MyTopicPair(String source, String target) { + super(source, target); + } + + @Override + protected TopicEndpoint getTopicEndpointManager() { + return (mgr != null ? mgr : super.getTopicEndpointManager()); + } + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml b/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml index 860468821..b11983bed 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml +++ b/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml @@ -44,4 +44,11 @@ <logger name="org.onap.policy.controlloop.actorserviceprovider.impl.HttpOperation" level="info" additivity="false"> <appender-ref ref="STDOUT" /> </logger> + + <!-- this is required for TopicPairOperationTest --> + <logger + name="org.onap.policy.controlloop.actorserviceprovider.impl.TopicPairOperation" + level="info" additivity="false"> + <appender-ref ref="STDOUT" /> + </logger> </configuration> |