diff options
Diffstat (limited to 'models-interactions/model-actors/actorServiceProvider/src')
35 files changed, 1735 insertions, 1682 deletions
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java index 2886b1feb..24c2cfc23 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/ActorService.java @@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory; * {@link #start()} to start all of the actors. When finished using the actor service, * invoke {@link #stop()} or {@link #shutdown()}. */ -public class ActorService extends StartConfigPartial<Map<String, Object>> { +public class ActorService extends StartConfigPartial<Map<String, Map<String, Object>>> { private static final Logger logger = LoggerFactory.getLogger(ActorService.class); private final Map<String, Actor> name2actor; @@ -116,14 +116,14 @@ public class ActorService extends StartConfigPartial<Map<String, Object>> { } @Override - protected void doConfigure(Map<String, Object> parameters) { + protected void doConfigure(Map<String, Map<String, Object>> parameters) { logger.info("configuring actors"); BeanValidationResult valres = new BeanValidationResult("ActorService", parameters); for (Actor actor : name2actor.values()) { String actorName = actor.getName(); - Map<String, Object> subparams = Util.translateToMap(actorName, parameters.get(actorName)); + Map<String, Object> subparams = parameters.get(actorName); if (subparams != null) { diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActor.java new file mode 100644 index 000000000..1e44a170c --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActor.java @@ -0,0 +1,108 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import org.apache.commons.lang3.tuple.Pair; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicActorParams; +import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler; +import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicManager; + +/** + * Actor that uses a bidirectional topic. The actor's parameters must be a + * {@link BidirectionalTopicActorParams}. + */ +public class BidirectionalTopicActor extends ActorImpl implements BidirectionalTopicManager { + + /** + * Maps a pair of sink and source topic names to their bidirectional topic. + */ + private final Map<Pair<String, String>, BidirectionalTopicHandler> params2topic = new ConcurrentHashMap<>(); + + + /** + * Constructs the object. + * + * @param name actor's name + */ + public BidirectionalTopicActor(String name) { + super(name); + } + + @Override + protected void doStart() { + params2topic.values().forEach(BidirectionalTopicHandler::start); + super.doStart(); + } + + @Override + protected void doStop() { + params2topic.values().forEach(BidirectionalTopicHandler::stop); + super.doStop(); + } + + @Override + protected void doShutdown() { + params2topic.values().forEach(BidirectionalTopicHandler::shutdown); + params2topic.clear(); + super.doShutdown(); + } + + @Override + public BidirectionalTopicHandler getTopicHandler(String sinkTopic, String sourceTopic) { + Pair<String, String> key = Pair.of(sinkTopic, sourceTopic); + + return params2topic.computeIfAbsent(key, pair -> { + try { + return makeTopicHandler(sinkTopic, sourceTopic); + } catch (BidirectionalTopicClientException e) { + throw new IllegalArgumentException(e); + } + }); + } + + /** + * Translates the parameters to a {@link BidirectionalTopicActorParams} and then + * creates a function that will extract operator-specific parameters. + */ + @Override + protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) { + String actorName = getName(); + + // @formatter:off + return Util.translate(actorName, actorParameters, BidirectionalTopicActorParams.class) + .doValidation(actorName) + .makeOperationParameters(actorName); + // @formatter:on + } + + // may be overridden by junit tests + + protected BidirectionalTopicHandler makeTopicHandler(String sinkTopic, String sourceTopic) + throws BidirectionalTopicClientException { + + return new BidirectionalTopicHandler(sinkTopic, sourceTopic); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperation.java index 6b584d7c6..f82015d6b 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperation.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperation.java @@ -24,40 +24,42 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import lombok.Getter; -import org.apache.commons.lang3.tuple.Triple; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.utils.NetLoggerUtil; -import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer; -import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; import org.onap.policy.common.utils.coder.CoderException; -import org.onap.policy.common.utils.coder.StandardCoder; import org.onap.policy.common.utils.coder.StandardCoderObject; import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; -import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams; import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; +import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler; import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder; -import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair; import org.onap.policy.controlloop.policy.PolicyResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Operation that uses a Topic pair. + * Operation that uses a bidirectional topic. * * @param <S> response type */ @Getter -public abstract class TopicPairOperation<Q, S> extends OperationPartial { - private static final Logger logger = LoggerFactory.getLogger(TopicPairOperation.class); - private static final Coder coder = new StandardCoder(); +public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial { + private static final Logger logger = LoggerFactory.getLogger(BidirectionalTopicOperation.class); + + /** + * Response status. + */ + public enum Status { + SUCCESS, FAILURE, STILL_WAITING + } // fields extracted from the operator - private final TopicPair topicPair; + private final BidirectionalTopicHandler topicHandler; private final Forwarder forwarder; - private final TopicPairParams pairParams; + private final BidirectionalTopicParams topicParams; private final long timeoutMs; /** @@ -73,13 +75,14 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial { * @param operator operator that created this operation * @param clazz response class */ - public TopicPairOperation(ControlLoopOperationParams params, TopicPairOperator operator, Class<S> clazz) { + public BidirectionalTopicOperation(ControlLoopOperationParams params, BidirectionalTopicOperator operator, + Class<S> clazz) { super(params, operator); - this.topicPair = operator.getTopicPair(); + this.topicHandler = operator.getTopicHandler(); this.forwarder = operator.getForwarder(); - this.pairParams = operator.getParams(); + this.topicParams = operator.getParams(); this.responseClass = clazz; - this.timeoutMs = TimeUnit.MILLISECONDS.convert(pairParams.getTimeoutSec(), TimeUnit.SECONDS); + this.timeoutMs = TimeUnit.MILLISECONDS.convert(topicParams.getTimeoutSec(), TimeUnit.SECONDS); } /** @@ -101,18 +104,17 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial { final List<String> expectedKeyValues = getExpectedKeyValues(attempt, request); final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - final CompletableFuture<Triple<CommInfrastructure, String, StandardCoderObject>> future = - new CompletableFuture<>(); final Executor executor = params.getExecutor(); // register a listener BEFORE publishing - // @formatter:off - TriConsumer<CommInfrastructure, String, StandardCoderObject> listener = - (infra, rawResponse, scoResponse) -> future.complete(Triple.of(infra, rawResponse, scoResponse)); - // @formatter:on - - // TODO this currently only allows a single matching response + BiConsumer<String, StandardCoderObject> listener = (rawResponse, scoResponse) -> { + OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse); + if (latestOutcome != null) { + // final response - complete the controller + controller.completeAsync(() -> latestOutcome, executor); + } + }; forwarder.register(expectedKeyValues, listener); @@ -128,16 +130,6 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial { throw e; } - - // once "future" completes, process the response, and then complete the controller - - // @formatter:off - future.thenApplyAsync( - triple -> processResponse(triple.getLeft(), outcome, triple.getMiddle(), triple.getRight()), - executor) - .whenCompleteAsync(controller.delayedComplete(), executor); - // @formatter:on - return controller; } @@ -175,12 +167,11 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial { throw new IllegalArgumentException("cannot encode request", e); } - List<CommInfrastructure> list = topicPair.publish(json); - if (list.isEmpty()) { + if (!topicHandler.send(json)) { throw new IllegalStateException("nothing published"); } - logTopicRequest(list, request); + logMessage(EventType.OUT, topicHandler.getSinkTopicCommInfrastructure(), topicHandler.getSinkTopic(), request); } /** @@ -190,15 +181,17 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial { * @param outcome outcome to be populated * @param response raw response to process * @param scoResponse response, as a {@link StandardCoderObject} - * @return the outcome + * @return the outcome, or {@code null} if still waiting for completion */ - protected OperationOutcome processResponse(CommInfrastructure infra, OperationOutcome outcome, String rawResponse, + protected OperationOutcome processResponse(OperationOutcome outcome, String rawResponse, StandardCoderObject scoResponse) { logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId()); - logTopicResponse(infra, rawResponse); + logMessage(EventType.IN, topicHandler.getSourceTopicCommInfrastructure(), topicHandler.getSourceTopic(), + rawResponse); + // decode the response S response; if (responseClass == String.class) { response = responseClass.cast(rawResponse); @@ -216,17 +209,26 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial { } } - if (!isSuccess(rawResponse, response)) { - logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(), - params.getRequestId()); - return setOutcome(outcome, PolicyResult.FAILURE); - } + // check its status + switch (detmStatus(rawResponse, response)) { + case SUCCESS: + logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + setOutcome(outcome, PolicyResult.SUCCESS); + postProcessResponse(outcome, rawResponse, response); + return outcome; - logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId()); - setOutcome(outcome, PolicyResult.SUCCESS); - postProcessResponse(outcome, rawResponse, response); + case FAILURE: + logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + return setOutcome(outcome, PolicyResult.FAILURE); - return outcome; + case STILL_WAITING: + default: + logger.info("{}.{} request incomplete for {}", params.getActor(), params.getOperation(), + params.getRequestId()); + return null; + } } /** @@ -241,76 +243,11 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial { } /** - * Determines if the response indicates success. + * Determines the status of the response. * * @param rawResponse raw response * @param response decoded response - * @return {@code true} if the response indicates success, {@code false} otherwise - */ - protected abstract boolean isSuccess(String rawResponse, S response); - - /** - * Logs a TOPIC request. If the request is not of type, String, then it attempts to - * pretty-print it into JSON before logging. - * - * @param infrastructures list of communication infrastructures on which it was - * published - * @param request request to be logged - */ - protected void logTopicRequest(List<CommInfrastructure> infrastructures, Q request) { - if (infrastructures.isEmpty()) { - return; - } - - String json; - try { - if (request == null) { - json = null; - } else if (request instanceof String) { - json = request.toString(); - } else { - json = makeCoder().encode(request, true); - } - - } catch (CoderException e) { - logger.warn("cannot pretty-print request", e); - json = request.toString(); - } - - for (CommInfrastructure infra : infrastructures) { - logger.info("[OUT|{}|{}|]{}{}", infra, pairParams.getTarget(), NetLoggerUtil.SYSTEM_LS, json); - } - } - - /** - * Logs a TOPIC response. If the response is not of type, String, then it attempts to - * pretty-print it into JSON before logging. - * - * @param infra communication infrastructure on which the response was received - * @param response response to be logged + * @return the status of the response */ - protected <T> void logTopicResponse(CommInfrastructure infra, T response) { - String json; - try { - if (response == null) { - json = null; - } else if (response instanceof String) { - json = response.toString(); - } else { - json = makeCoder().encode(response, true); - } - - } catch (CoderException e) { - logger.warn("cannot pretty-print response", e); - json = response.toString(); - } - - logger.info("[IN|{}|{}|]{}{}", infra, pairParams.getSource(), NetLoggerUtil.SYSTEM_LS, json); - } - - // these may be overridden by junit tests - - protected Coder makeCoder() { - return coder; - } + protected abstract Status detmStatus(String rawResponse, S response); } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperator.java index 8ce013388..51689e49b 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperator.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperator.java @@ -28,24 +28,24 @@ import lombok.Getter; import org.onap.policy.common.parameters.ValidationResult; import org.onap.policy.controlloop.actorserviceprovider.Operation; import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; -import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams; +import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler; +import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicManager; import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder; import org.onap.policy.controlloop.actorserviceprovider.topic.SelectorKey; -import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair; -import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager; /** - * Operator that uses a pair of topics, one for publishing the request, and another for - * receiving the response. Topic operators may share a {@link TopicPair}. + * Operator that uses a bidirectional topic. Topic operators may share a + * {@link BidirectionalTopicHandler}. */ -public abstract class TopicPairOperator extends OperatorPartial { +public abstract class BidirectionalTopicOperator extends OperatorPartial { /** - * Manager from which to get the topic pair. + * Manager from which to get the topic handlers. */ - private final TopicPairManager pairManager; + private final BidirectionalTopicManager topicManager; /** * Keys used to extract the fields used to select responses for this operator. @@ -62,13 +62,13 @@ public abstract class TopicPairOperator extends OperatorPartial { * will not, thus operations may copy it. */ @Getter - private TopicPairParams params; + private BidirectionalTopicParams params; /** - * Topic pair associated with the parameters. + * Topic handler associated with the parameters. */ @Getter - private TopicPair topicPair; + private BidirectionalTopicHandler topicHandler; /** * Forwarder associated with the parameters. @@ -82,27 +82,27 @@ public abstract class TopicPairOperator extends OperatorPartial { * * @param actorName name of the actor with which this operator is associated * @param name operation name - * @param pairManager manager from which to get the topic pair + * @param topicManager manager from which to get the topic handler * @param selectorKeys keys used to extract the fields used to select responses for * this operator */ - public TopicPairOperator(String actorName, String name, TopicPairManager pairManager, + public BidirectionalTopicOperator(String actorName, String name, BidirectionalTopicManager topicManager, List<SelectorKey> selectorKeys) { super(actorName, name); - this.pairManager = pairManager; + this.topicManager = topicManager; this.selectorKeys = selectorKeys; } @Override protected void doConfigure(Map<String, Object> parameters) { - params = Util.translate(getFullName(), parameters, TopicPairParams.class); + params = Util.translate(getFullName(), parameters, BidirectionalTopicParams.class); ValidationResult result = params.validate(getFullName()); if (!result.isValid()) { throw new ParameterValidationRuntimeException("invalid parameters", result); } - topicPair = pairManager.getTopicPair(params.getSource(), params.getTarget()); - forwarder = topicPair.addForwarder(selectorKeys); + topicHandler = topicManager.getTopicHandler(params.getSinkTopic(), params.getSourceTopic()); + forwarder = topicHandler.addForwarder(selectorKeys); } /** @@ -112,19 +112,21 @@ public abstract class TopicPairOperator extends OperatorPartial { * @param <S> response type * @param actorName actor name * @param operation operation name - * @param pairManager manager from which to get the topic pair + * @param topicManager manager from which to get the topic handler * @param operationMaker function to make an operation * @param keys keys used to extract the fields used to select responses for this * operator * @return a new operator */ // @formatter:off - public static <Q,S> TopicPairOperator makeOperator(String actorName, String operation, TopicPairManager pairManager, - BiFunction<ControlLoopOperationParams, TopicPairOperator, TopicPairOperation<Q,S>> operationMaker, + public static <Q, S> BidirectionalTopicOperator makeOperator(String actorName, String operation, + BidirectionalTopicManager topicManager, + BiFunction<ControlLoopOperationParams, BidirectionalTopicOperator, + BidirectionalTopicOperation<Q, S>> operationMaker, SelectorKey... keys) { // @formatter:off - return makeOperator(actorName, operation, pairManager, Arrays.asList(keys), operationMaker); + return makeOperator(actorName, operation, topicManager, Arrays.asList(keys), operationMaker); } /** @@ -134,19 +136,21 @@ public abstract class TopicPairOperator extends OperatorPartial { * @param <S> response type * @param actorName actor name * @param operation operation name - * @param pairManager manager from which to get the topic pair + * @param topicManager manager from which to get the topic handler * @param keys keys used to extract the fields used to select responses for * this operator * @param operationMaker function to make an operation * @return a new operator */ // @formatter:off - public static <Q,S> TopicPairOperator makeOperator(String actorName, String operation, TopicPairManager pairManager, + public static <Q,S> BidirectionalTopicOperator makeOperator(String actorName, String operation, + BidirectionalTopicManager topicManager, List<SelectorKey> keys, - BiFunction<ControlLoopOperationParams, TopicPairOperator, TopicPairOperation<Q,S>> operationMaker) { + BiFunction<ControlLoopOperationParams, BidirectionalTopicOperator, + BidirectionalTopicOperation<Q,S>> operationMaker) { // @formatter:on - return new TopicPairOperator(actorName, operation, pairManager, keys) { + return new BidirectionalTopicOperator(actorName, operation, topicManager, keys) { @Override public synchronized Operation buildOperation(ControlLoopOperationParams params) { return operationMaker.apply(params, this); diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java index ba75f0be6..f1829d79a 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java @@ -33,9 +33,7 @@ import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.http.client.HttpClient; import org.onap.policy.common.endpoints.utils.NetLoggerUtil; import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; -import org.onap.policy.common.utils.coder.Coder; import org.onap.policy.common.utils.coder.CoderException; -import org.onap.policy.common.utils.coder.StandardCoder; import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams; @@ -52,7 +50,6 @@ import org.slf4j.LoggerFactory; @Getter public abstract class HttpOperation<T> extends OperationPartial { private static final Logger logger = LoggerFactory.getLogger(HttpOperation.class); - private static final Coder coder = new StandardCoder(); /** * Operator that created this operation. @@ -171,7 +168,7 @@ public abstract class HttpOperation<T> extends OperationPartial { String strResponse = HttpClient.getBody(rawResponse, String.class); - logRestResponse(url, strResponse); + logMessage(EventType.IN, CommInfrastructure.REST, url, strResponse); T response; if (responseClass == String.class) { @@ -224,63 +221,10 @@ public abstract class HttpOperation<T> extends OperationPartial { return (rawResponse.getStatus() == 200); } - /** - * Logs a REST request. If the request is not of type, String, then it attempts to - * pretty-print it into JSON before logging. - * - * @param url request URL - * @param request request to be logged - */ - public <Q> void logRestRequest(String url, Q request) { - String json; - try { - if (request == null) { - json = null; - } else if (request instanceof String) { - json = request.toString(); - } else { - json = makeCoder().encode(request, true); - } - - } catch (CoderException e) { - logger.warn("cannot pretty-print request", e); - json = request.toString(); - } - - NetLoggerUtil.log(EventType.OUT, CommInfrastructure.REST, url, json); - logger.info("[OUT|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json); - } - - /** - * Logs a REST response. If the response is not of type, String, then it attempts to - * pretty-print it into JSON before logging. - * - * @param url request URL - * @param response response to be logged - */ - public <S> void logRestResponse(String url, S response) { - String json; - try { - if (response == null) { - json = null; - } else if (response instanceof String) { - json = response.toString(); - } else { - json = makeCoder().encode(response, true); - } - - } catch (CoderException e) { - logger.warn("cannot pretty-print response", e); - json = response.toString(); - } - - NetLoggerUtil.log(EventType.IN, CommInfrastructure.REST, url, json); - logger.info("[IN|{}|{}|]{}{}", CommInfrastructure.REST, url, NetLoggerUtil.SYSTEM_LS, json); - } - - // these may be overridden by junit tests - - protected Coder makeCoder() { - return coder; + @Override + public <Q> String logMessage(EventType direction, CommInfrastructure infra, String sink, Q request) { + String json = super.logMessage(direction, infra, sink, request); + NetLoggerUtil.log(direction, infra, sink, json); + return json; } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java index d00b88bb5..0b3497197 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java @@ -20,7 +20,12 @@ package org.onap.policy.controlloop.actorserviceprovider.impl; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; import java.util.List; +import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; @@ -28,6 +33,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; import org.onap.policy.controlloop.ControlLoopOperation; import org.onap.policy.controlloop.actorserviceprovider.CallbackManager; import org.onap.policy.controlloop.actorserviceprovider.Operation; @@ -53,8 +66,8 @@ import org.slf4j.LoggerFactory; * be done to cancel that particular operation. */ public abstract class OperationPartial implements Operation { - private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class); + private static final Coder coder = new StandardCoder(); public static final long DEFAULT_RETRY_WAIT_MS = 1000L; // values extracted from the operator @@ -470,103 +483,110 @@ public abstract class OperationPartial implements Operation { * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels * any outstanding futures when one completes. * - * @param futures futures for which to wait - * @return a future to cancel or await an outcome. If this future is canceled, then - * all of the futures will be canceled + * @param futureMakers function to make a future. If the function returns + * {@code null}, then no future is created for that function. On the other + * hand, if the function throws an exception, then the previously created + * functions are canceled and the exception is re-thrown + * @return a future to cancel or await an outcome, or {@code null} if no futures were + * created. If this future is canceled, then all of the futures will be + * canceled */ - protected CompletableFuture<OperationOutcome> anyOf(List<CompletableFuture<OperationOutcome>> futures) { - - // convert list to an array - @SuppressWarnings("rawtypes") - CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]); + protected CompletableFuture<OperationOutcome> anyOf( + @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) { - @SuppressWarnings("unchecked") - CompletableFuture<OperationOutcome> result = anyOf(arrFutures); - return result; + return anyOf(Arrays.asList(futureMakers)); } /** - * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any - * outstanding futures when one completes. + * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels + * any outstanding futures when one completes. * - * @param futures futures for which to wait - * @return a future to cancel or await an outcome. If this future is canceled, then - * all of the futures will be canceled + * @param futureMakers function to make a future. If the function returns + * {@code null}, then no future is created for that function. On the other + * hand, if the function throws an exception, then the previously created + * functions are canceled and the exception is re-thrown + * @return a future to cancel or await an outcome, or {@code null} if no futures were + * created. If this future is canceled, then all of the futures will be + * canceled. Similarly, when this future completes, any incomplete futures + * will be canceled */ protected CompletableFuture<OperationOutcome> anyOf( - @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) { + List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) { + + PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + CompletableFuture<OperationOutcome>[] futures = + attachFutures(controller, futureMakers, UnaryOperator.identity()); + + if (futures.length == 0) { + // no futures were started + return null; + } if (futures.length == 1) { return futures[0]; } - final Executor executor = params.getExecutor(); - final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - - attachFutures(controller, futures); - - // @formatter:off - CompletableFuture.anyOf(futures) - .thenApply(object -> (OperationOutcome) object) - .whenCompleteAsync(controller.delayedComplete(), executor); - // @formatter:on + CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome) + .whenCompleteAsync(controller.delayedComplete(), params.getExecutor()); return controller; } /** - * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels - * the futures if returned future is canceled. The future returns the "worst" outcome, - * based on priority (see {@link #detmPriority(OperationOutcome)}). + * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}. * - * @param futures futures for which to wait - * @return a future to cancel or await an outcome. If this future is canceled, then - * all of the futures will be canceled + * @param futureMakers function to make a future. If the function returns + * {@code null}, then no future is created for that function. On the other + * hand, if the function throws an exception, then the previously created + * functions are canceled and the exception is re-thrown + * @return a future to cancel or await an outcome, or {@code null} if no futures were + * created. If this future is canceled, then all of the futures will be + * canceled */ - protected CompletableFuture<OperationOutcome> allOf(List<CompletableFuture<OperationOutcome>> futures) { - - // convert list to an array - @SuppressWarnings("rawtypes") - CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]); + protected CompletableFuture<OperationOutcome> allOf( + @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) { - @SuppressWarnings("unchecked") - CompletableFuture<OperationOutcome> result = allOf(arrFutures); - return result; + return allOf(Arrays.asList(futureMakers)); } /** - * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the - * futures if returned future is canceled. The future returns the "worst" outcome, - * based on priority (see {@link #detmPriority(OperationOutcome)}). + * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}. * - * @param futures futures for which to wait - * @return a future to cancel or await an outcome. If this future is canceled, then - * all of the futures will be canceled + * @param futureMakers function to make a future. If the function returns + * {@code null}, then no future is created for that function. On the other + * hand, if the function throws an exception, then the previously created + * functions are canceled and the exception is re-thrown + * @return a future to cancel or await an outcome, or {@code null} if no futures were + * created. If this future is canceled, then all of the futures will be + * canceled. Similarly, when this future completes, any incomplete futures + * will be canceled */ protected CompletableFuture<OperationOutcome> allOf( - @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) { - - if (futures.length == 1) { - return futures[0]; - } + List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) { + PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - - attachFutures(controller, futures); + Queue<OperationOutcome> outcomes = new LinkedList<>(); - OperationOutcome[] outcomes = new OperationOutcome[futures.length]; + CompletableFuture<OperationOutcome>[] futures = + attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> { + synchronized (outcomes) { + outcomes.add(outcome); + } + return outcome; + })); - @SuppressWarnings("rawtypes") - CompletableFuture[] futures2 = new CompletableFuture[futures.length]; + if (futures.length == 0) { + // no futures were started + return null; + } - // record the outcomes of each future when it completes - for (int count = 0; count < futures2.length; ++count) { - final int count2 = count; - futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2); + if (futures.length == 1) { + return futures[0]; } // @formatter:off - CompletableFuture.allOf(futures2) + CompletableFuture.allOf(futures) .thenApply(unused -> combineOutcomes(outcomes)) .whenCompleteAsync(controller.delayedComplete(), params.getExecutor()); // @formatter:on @@ -575,22 +595,62 @@ public abstract class OperationPartial implements Operation { } /** - * Attaches the given futures to the controller. + * Invokes the functions to create the futures and attaches them to the controller. * * @param controller master controller for all of the futures - * @param futures futures to be attached to the controller - */ - private void attachFutures(PipelineControllerFuture<OperationOutcome> controller, - @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) { + * @param futureMakers futures to be attached to the controller + * @param adorn function that "adorns" the future, possible adding onto its pipeline. + * Returns the adorned future + * @return an array of futures, possibly zero-length. If the array is of size one, + * then that one item should be returned instead of the controller + */ + private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller, + List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers, + UnaryOperator<CompletableFuture<OperationOutcome>> adorn) { + + if (futureMakers.isEmpty()) { + @SuppressWarnings("unchecked") + CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0]; + return result; + } - if (futures.length == 0) { - throw new IllegalArgumentException("empty list of futures"); + // the last, unadorned future that is created + CompletableFuture<OperationOutcome> lastFuture = null; + + List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size()); + + // make each future + for (var maker : futureMakers) { + try { + CompletableFuture<OperationOutcome> future = maker.get(); + if (future == null) { + continue; + } + + // propagate "stop" to the future + controller.add(future); + + futures.add(adorn.apply(future)); + + lastFuture = future; + + } catch (RuntimeException e) { + logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId()); + controller.cancel(false); + throw e; + } } - // attach each task - for (CompletableFuture<OperationOutcome> future : futures) { - controller.add(future); + @SuppressWarnings("unchecked") + CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()]; + + if (result.length == 1) { + // special case - return the unadorned future + result[0] = lastFuture; + return result; } + + return futures.toArray(result); } /** @@ -599,15 +659,13 @@ public abstract class OperationPartial implements Operation { * @param outcomes outcomes to be examined * @return the combined outcome */ - private OperationOutcome combineOutcomes(OperationOutcome[] outcomes) { + private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) { // identify the outcome with the highest priority - OperationOutcome outcome = outcomes[0]; + OperationOutcome outcome = outcomes.remove(); int priority = detmPriority(outcome); - // start with "1", as we've already dealt with "0" - for (int count = 1; count < outcomes.length; ++count) { - OperationOutcome outcome2 = outcomes[count]; + for (OperationOutcome outcome2 : outcomes) { int priority2 = detmPriority(outcome2); if (priority2 > priority) { @@ -656,72 +714,114 @@ public abstract class OperationPartial implements Operation { } /** - * Performs a task, after verifying that the controller is still running. Also checks - * that the previous outcome was successful, if specified. + * Performs a sequence of tasks, stopping if a task fails. A given task's future is + * not created until the previous task completes. The pipeline returns the outcome of + * the last task executed. * - * @param controller overall pipeline controller - * @param checkSuccess {@code true} to check the previous outcome, {@code false} - * otherwise - * @param outcome outcome of the previous task - * @param task task to be performed - * @return the task, if everything checks out. Otherwise, it returns an incomplete - * future and completes the controller instead + * @param futureMakers functions to make the futures + * @return a future to cancel the sequence or await the outcome */ - // @formatter:off - protected CompletableFuture<OperationOutcome> doTask( - PipelineControllerFuture<OperationOutcome> controller, - boolean checkSuccess, OperationOutcome outcome, - CompletableFuture<OperationOutcome> task) { - // @formatter:on + protected CompletableFuture<OperationOutcome> sequence( + @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) { - if (checkSuccess && !isSuccess(outcome)) { - /* - * must complete before canceling so that cancel() doesn't cause controller to - * complete - */ - controller.complete(outcome); - task.cancel(false); - return new CompletableFuture<>(); + return sequence(Arrays.asList(futureMakers)); + } + + /** + * Performs a sequence of tasks, stopping if a task fails. A given task's future is + * not created until the previous task completes. The pipeline returns the outcome of + * the last task executed. + * + * @param futureMakers functions to make the futures + * @return a future to cancel the sequence or await the outcome, or {@code null} if + * there were no tasks to perform + */ + protected CompletableFuture<OperationOutcome> sequence( + List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) { + + Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers); + + CompletableFuture<OperationOutcome> nextTask = getNextTask(queue); + if (nextTask == null) { + // no tasks + return null; + } + + if (queue.isEmpty()) { + // only one task - just return it rather than wrapping it in a controller + return nextTask; } - return controller.wrap(task); + /* + * multiple tasks - need a controller to stop whichever task is currently + * executing + */ + final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + final Executor executor = params.getExecutor(); + + // @formatter:off + controller.wrap(nextTask) + .thenComposeAsync(nextTaskOnSuccess(controller, queue), executor) + .whenCompleteAsync(controller.delayedComplete(), executor); + // @formatter:on + + return controller; } /** - * Performs a task, after verifying that the controller is still running. Also checks - * that the previous outcome was successful, if specified. + * Executes the next task in the queue, if the previous outcome was successful. * - * @param controller overall pipeline controller - * @param checkSuccess {@code true} to check the previous outcome, {@code false} - * otherwise - * @param task function to start the task to be performed - * @return a function to perform the task. If everything checks out, then it returns - * the task. Otherwise, it returns an incomplete future and completes the - * controller instead + * @param controller pipeline controller + * @param taskQueue queue of tasks to be performed + * @return a future to execute the remaining tasks, or the current outcome, if it's a + * failure, or if there are no more tasks */ - // @formatter:off - protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask( + private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess( PipelineControllerFuture<OperationOutcome> controller, - boolean checkSuccess, - Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) { - // @formatter:on + Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) { return outcome -> { - - if (!controller.isRunning()) { - return new CompletableFuture<>(); + if (!isSuccess(outcome)) { + // return the failure + return CompletableFuture.completedFuture(outcome); } - if (checkSuccess && !isSuccess(outcome)) { - controller.complete(outcome); - return new CompletableFuture<>(); + CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue); + if (nextTask == null) { + // no tasks - just return the success + return CompletableFuture.completedFuture(outcome); } - return controller.wrap(task.apply(outcome)); + // @formatter:off + return controller + .wrap(nextTask) + .thenComposeAsync(nextTaskOnSuccess(controller, taskQueue), params.getExecutor()); + // @formatter:on }; } /** + * Gets the next task from the queue, skipping those that are {@code null}. + * + * @param taskQueue task queue + * @return the next task, or {@code null} if the queue is now empty + */ + private CompletableFuture<OperationOutcome> getNextTask( + Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) { + + Supplier<CompletableFuture<OperationOutcome>> maker; + + while ((maker = taskQueue.poll()) != null) { + CompletableFuture<OperationOutcome> future = maker.get(); + if (future != null) { + return future; + } + } + + return null; + } + + /** * Sets the start time of the operation and invokes the callback to indicate that the * operation has started. Does nothing if the pipeline has been stopped. * <p/> @@ -809,6 +909,38 @@ public abstract class OperationPartial implements Operation { return (thrown instanceof TimeoutException); } + /** + * Logs a response. If the response is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param direction IN or OUT + * @param infra communication infrastructure on which it was published + * @param source source name (e.g., the URL or Topic name) + * @param response response to be logged + * @return the JSON text that was logged + */ + public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T response) { + String json; + try { + if (response == null) { + json = null; + } else if (response instanceof String) { + json = response.toString(); + } else { + json = makeCoder().encode(response, true); + } + + } catch (CoderException e) { + String type = (direction == EventType.IN ? "response" : "request"); + logger.warn("cannot pretty-print {}", type, e); + json = response.toString(); + } + + logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json); + + return json; + } + // these may be overridden by subclasses or junit tests /** @@ -841,4 +973,10 @@ public abstract class OperationPartial implements Operation { protected long getTimeoutMs(Integer timeoutSec) { return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS)); } + + // these may be overridden by junit tests + + protected Coder makeCoder() { + return coder; + } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java deleted file mode 100644 index c3e1e5c4d..000000000 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairActor.java +++ /dev/null @@ -1,112 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.controlloop.actorserviceprovider.impl; - -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Function; -import org.apache.commons.lang3.tuple.Pair; -import org.onap.policy.common.parameters.ValidationResult; -import org.onap.policy.controlloop.actorserviceprovider.Util; -import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; -import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairActorParams; -import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair; -import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager; - -/** - * Actor that uses a topic pair. The actor's parameters must be a - * {@link TopicPairActorParams}. - */ -public class TopicPairActor extends ActorImpl implements TopicPairManager { - - /** - * Maps a topic source and target name to its topic pair. - */ - private final Map<Pair<String, String>, TopicPair> params2topic = new ConcurrentHashMap<>(); - - - /** - * Constructs the object. - * - * @param name actor's name - */ - public TopicPairActor(String name) { - super(name); - } - - @Override - protected void doStart() { - params2topic.values().forEach(TopicPair::start); - super.doStart(); - } - - @Override - protected void doStop() { - params2topic.values().forEach(TopicPair::stop); - super.doStop(); - } - - @Override - protected void doShutdown() { - params2topic.values().forEach(TopicPair::shutdown); - params2topic.clear(); - super.doShutdown(); - } - - @Override - public TopicPair getTopicPair(String source, String target) { - Pair<String, String> key = Pair.of(source, target); - return params2topic.computeIfAbsent(key, pair -> new TopicPair(source, target)); - } - - /** - * Translates the parameters to a {@link TopicPairActorParams} and then creates a - * function that will extract operator-specific parameters. - */ - @Override - protected Function<String, Map<String, Object>> makeOperatorParameters(Map<String, Object> actorParameters) { - String actorName = getName(); - - TopicPairActorParams params = Util.translate(actorName, actorParameters, TopicPairActorParams.class); - ValidationResult result = params.validate(getName()); - if (!result.isValid()) { - throw new ParameterValidationRuntimeException("invalid parameters", result); - } - - // create a map of the default parameters - Map<String, Object> defaultParams = Util.translateToMap(getName(), params.getDefaults()); - Map<String, Map<String, Object>> operations = params.getOperation(); - - return operationName -> { - Map<String, Object> specificParams = operations.get(operationName); - if (specificParams == null) { - return null; - } - - // start with a copy of defaults and overlay with specific - Map<String, Object> subparams = new TreeMap<>(defaultParams); - subparams.putAll(specificParams); - - return Util.translateToMap(getName() + "." + operationName, subparams); - }; - } -} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParams.java new file mode 100644 index 000000000..291aeeb23 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParams.java @@ -0,0 +1,57 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.parameters; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.common.parameters.annotations.Min; + +/** + * Parameters used by Actors whose Operators use bidirectional topic. + */ +@Getter +@Setter +@EqualsAndHashCode(callSuper = true) +public class BidirectionalTopicActorParams extends CommonActorParams { + + /* + * Optional, default values that are used if missing from the operation-specific + * parameters. + */ + + /** + * Sink topic name to which requests should be published. + */ + private String sinkTopic; + + /** + * Source topic name, from which to read responses. + */ + private String sourceTopic; + + /** + * Amount of time, in seconds, to wait for the HTTP request to complete. The default + * is 90 seconds. + */ + @Min(1) + private int timeoutSec = 90; +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicParams.java index 33fcf3052..cafca1fa6 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParams.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicParams.java @@ -29,31 +29,33 @@ import org.onap.policy.common.parameters.annotations.NotBlank; import org.onap.policy.common.parameters.annotations.NotNull; /** - * Parameters used by Operators that use a pair of Topics, one to publish requests and the - * other to receive responses. + * Parameters used by Operators that use a bidirectional topic. */ @NotNull @NotBlank @Data @Builder(toBuilder = true) -public class TopicPairParams { +public class BidirectionalTopicParams { /** - * Source topic end point, from which to read responses. + * Sink topic name to which requests should be published. */ - private String source; + private String sinkTopic; /** - * Name of the target topic end point to which requests should be published. + * Source topic name, from which to read responses. */ - private String target; + private String sourceTopic; /** - * Amount of time, in seconds to wait for the response. The default is five minutes. + * Amount of time, in seconds to wait for the response. + * <p/> + * Note: this should NOT have a default value, as it receives its default value from + * {@link BidirectionalTopicActorParams}. */ @Min(1) - @Builder.Default - private int timeoutSec = 300; + private int timeoutSec; + /** * Validates the parameters. diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/CommonActorParams.java index 42a44ee9c..dc6f2b657 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParams.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/CommonActorParams.java @@ -21,37 +21,57 @@ package org.onap.policy.controlloop.actorserviceprovider.parameters; import java.util.Map; -import lombok.Builder; -import lombok.Data; -import org.onap.policy.common.parameters.BeanValidationResult; +import java.util.TreeMap; +import java.util.function.Function; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; import org.onap.policy.common.parameters.BeanValidator; import org.onap.policy.common.parameters.ValidationResult; -import org.onap.policy.common.parameters.annotations.NotBlank; import org.onap.policy.common.parameters.annotations.NotNull; +import org.onap.policy.controlloop.actorserviceprovider.Util; /** - * Parameters used by Actors whose Operators use a pair of Topics, one to publish requests - * and the other to receive responses. + * Superclass for Actor parameters that have default values in "this" object, and + * operation-specific values in {@link #operation}. */ -@NotNull -@NotBlank -@Data -@Builder -public class TopicPairActorParams { +@Getter +@Setter +@EqualsAndHashCode +public class CommonActorParams { /** - * This contains the default parameters that are used when an operation doesn't - * specify them. Note: each operation to be used must still have an entry in - * {@link #operation}, even if it's empty. Otherwise, the given operation will not be - * started. + * Maps the operation name to its parameters. */ - private TopicPairParams defaults; + @NotNull + protected Map<String, Map<String, Object>> operation; + /** - * Maps an operation name to its individual parameters. + * Extracts a specific operation's parameters from "this". + * + * @param name name of the item containing "this" + * @return a function to extract an operation's parameters from "this". Note: the + * returned function is not thread-safe */ - private Map<String, Map<String, Object>> operation; + public Function<String, Map<String, Object>> makeOperationParameters(String name) { + + Map<String, Object> defaultParams = Util.translateToMap(name, this); + defaultParams.remove("operation"); + + return operationName -> { + Map<String, Object> specificParams = operation.get(operationName); + if (specificParams == null) { + return null; + } + + // start with a copy of defaults and overlay with specific + Map<String, Object> subparams = new TreeMap<>(defaultParams); + subparams.putAll(specificParams); + return Util.translateToMap(name + "." + operationName, subparams); + }; + } /** * Validates the parameters. @@ -60,7 +80,7 @@ public class TopicPairActorParams { * @return "this" * @throws IllegalArgumentException if the parameters are invalid */ - public TopicPairActorParams doValidation(String name) { + public CommonActorParams doValidation(String name) { ValidationResult result = validate(name); if (!result.isValid()) { throw new ParameterValidationRuntimeException("invalid parameters", result); @@ -77,17 +97,6 @@ public class TopicPairActorParams { * @return the validation result */ public ValidationResult validate(String resultName) { - BeanValidationResult result = new BeanValidator().validateTop(resultName, this); - - if (defaults != null) { - result.addResult(defaults.validate("defaults")); - } - - // @formatter:off - result.validateMap("operation", operation, - (result2, entry) -> result2.validateNotNull(entry.getKey(), entry.getValue())); - // @formatter:on - - return result; + return new BeanValidator().validateTop(resultName, this); } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java index 275c8bc4e..d589e1d7e 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParams.java @@ -20,26 +20,23 @@ package org.onap.policy.controlloop.actorserviceprovider.parameters; -import java.util.Map; -import java.util.function.Function; -import lombok.Data; -import org.onap.policy.common.parameters.BeanValidationResult; -import org.onap.policy.common.parameters.BeanValidator; -import org.onap.policy.common.parameters.ValidationResult; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; import org.onap.policy.common.parameters.annotations.Min; -import org.onap.policy.common.parameters.annotations.NotBlank; -import org.onap.policy.common.parameters.annotations.NotNull; -import org.onap.policy.controlloop.actorserviceprovider.Util; /** - * Parameters used by Actors that connect to a server via HTTP. This contains the - * parameters that are common to all of the operations. Only the path changes for each - * operation, thus it includes a mapping from operation name to path. + * Parameters used by Actors that connect to a server via HTTP. */ -@Data -@NotNull -@NotBlank -public class HttpActorParams { +@Getter +@Setter +@EqualsAndHashCode(callSuper = true) +public class HttpActorParams extends CommonActorParams { + + /* + * Optional, default values that are used if missing from the operation-specific + * parameters. + */ /** * Name of the HttpClient, as found in the HttpClientFactory. @@ -47,66 +44,9 @@ public class HttpActorParams { private String clientName; /** - * Amount of time, in seconds to wait for the HTTP request to complete, where zero - * indicates that it should wait forever. The default is zero. - */ - @Min(0) - private int timeoutSec = 0; - - /** - * Maps the operation name to its URI path. - */ - private Map<String, String> path; - - /** - * Extracts a specific operation's parameters from "this". - * - * @param name name of the item containing "this" - * @return a function to extract an operation's parameters from "this". Note: the - * returned function is not thread-safe - */ - public Function<String, Map<String, Object>> makeOperationParameters(String name) { - HttpParams subparams = HttpParams.builder().clientName(getClientName()).timeoutSec(getTimeoutSec()).build(); - - return operation -> { - String subpath = path.get(operation); - if (subpath == null) { - return null; - } - - subparams.setPath(subpath); - return Util.translateToMap(name + "." + operation, subparams); - }; - } - - /** - * Validates the parameters. - * - * @param name name of the object containing these parameters - * @return "this" - * @throws IllegalArgumentException if the parameters are invalid + * Amount of time, in seconds, to wait for the HTTP request to complete. The default + * is 90 seconds. */ - public HttpActorParams doValidation(String name) { - ValidationResult result = validate(name); - if (!result.isValid()) { - throw new ParameterValidationRuntimeException("invalid parameters", result); - } - - return this; - } - - /** - * Validates the parameters. - * - * @param resultName name of the result - * - * @return the validation result - */ - public ValidationResult validate(String resultName) { - BeanValidationResult result = new BeanValidator().validateTop(resultName, this); - - result.validateMap("path", path, (result2, entry) -> result2.validateNotNull(entry.getKey(), entry.getValue())); - - return result; - } + @Min(1) + private int timeoutSec = 90; } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java index 93711c032..2d3ab8b54 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParams.java @@ -48,12 +48,13 @@ public class HttpParams { private String path; /** - * Amount of time, in seconds to wait for the HTTP request to complete, where zero - * indicates that it should wait forever. The default is zero. + * Amount of time, in seconds, to wait for the HTTP request to complete. + * <p/> + * Note: this should NOT have a default value, as it receives its default value from + * {@link HttpActorParams}. */ - @Min(0) - @Builder.Default - private int timeoutSec = 0; + @Min(1) + private int timeoutSec; /** diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicHandler.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicHandler.java new file mode 100644 index 000000000..30ee1e2d0 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicHandler.java @@ -0,0 +1,79 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.topic; + +import java.util.List; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException; + +/** + * Handler for a bidirectional topic, supporting both publishing and forwarding of + * incoming messages. + */ +public class BidirectionalTopicHandler extends BidirectionalTopicClient { + + /** + * Listener that will be attached to the topic to receive responses. + */ + private final TopicListenerImpl listener = new TopicListenerImpl(); + + + /** + * Constructs the object. + * + * @param sinkTopic sink topic name + * @param sourceTopic source topic name + * @throws BidirectionalTopicClientException if an error occurs + */ + public BidirectionalTopicHandler(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException { + super(sinkTopic, sourceTopic); + } + + /** + * Starts listening on the source topic(s). + */ + public void start() { + getSource().register(listener); + } + + /** + * Stops listening on the source topic(s). + */ + public void stop() { + getSource().unregister(listener); + } + + /** + * Stops listening on the source topic(s) and clears all of the forwarders. + */ + public void shutdown() { + stop(); + listener.shutdown(); + } + + public Forwarder addForwarder(SelectorKey... keys) { + return listener.addForwarder(keys); + } + + public Forwarder addForwarder(List<SelectorKey> keys) { + return listener.addForwarder(keys); + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicManager.java index c351f95f6..10411875a 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairManager.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicManager.java @@ -21,17 +21,17 @@ package org.onap.policy.controlloop.actorserviceprovider.topic; /** - * Manages topic pairs. + * Manages bidirectional topics. */ @FunctionalInterface -public interface TopicPairManager { +public interface BidirectionalTopicManager { /** - * Gets the topic pair for the given parameters, creating one if it does not exist. + * Gets the topic handler for the given parameters, creating one if it does not exist. * - * @param source source topic name - * @param target target topic name - * @return the topic pair associated with the given source and target topics + * @param sinkTopic sink topic name + * @param sourceTopic source topic name + * @return the topic handler associated with the given sink and source topic names */ - TopicPair getTopicPair(String source, String target); + BidirectionalTopicHandler getTopicHandler(String sinkTopic, String sourceTopic); } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java index 8e9109c9e..2d98b66fc 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/Forwarder.java @@ -24,8 +24,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer; +import java.util.function.BiConsumer; import org.onap.policy.common.utils.coder.StandardCoderObject; import org.onap.policy.controlloop.actorserviceprovider.Util; import org.slf4j.Logger; @@ -43,7 +42,7 @@ public class Forwarder { * Maps a set of field values to one or more listeners. */ // @formatter:off - private final Map<List<String>, Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String>> + private final Map<List<String>, Map<BiConsumer<String, StandardCoderObject>, String>> values2listeners = new ConcurrentHashMap<>(); // @formatter:on @@ -68,13 +67,13 @@ public class Forwarder { * @param values field values of interest, in one-to-one correspondence with the keys * @param listener listener to register */ - public void register(List<String> values, TriConsumer<CommInfrastructure, String, StandardCoderObject> listener) { + public void register(List<String> values, BiConsumer<String, StandardCoderObject> listener) { if (keys.size() != values.size()) { throw new IllegalArgumentException("key/value mismatch"); } values2listeners.compute(values, (key, listeners) -> { - Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String> map = listeners; + Map<BiConsumer<String, StandardCoderObject>, String> map = listeners; if (map == null) { map = new ConcurrentHashMap<>(); } @@ -90,7 +89,7 @@ public class Forwarder { * @param values field values of interest, in one-to-one correspondence with the keys * @param listener listener to unregister */ - public void unregister(List<String> values, TriConsumer<CommInfrastructure, String, StandardCoderObject> listener) { + public void unregister(List<String> values, BiConsumer<String, StandardCoderObject> listener) { values2listeners.computeIfPresent(values, (key, listeners) -> { listeners.remove(listener); return (listeners.isEmpty() ? null : listeners); @@ -100,11 +99,10 @@ public class Forwarder { /** * Processes a message, forwarding it to the appropriate listeners, if any. * - * @param infra communication infrastructure on which the response was received * @param textMessage original text message that was received * @param scoMessage decoded text message */ - public void onMessage(CommInfrastructure infra, String textMessage, StandardCoderObject scoMessage) { + public void onMessage(String textMessage, StandardCoderObject scoMessage) { // extract the key values from the message List<String> values = new ArrayList<>(keys.size()); for (SelectorKey key : keys) { @@ -121,8 +119,7 @@ public class Forwarder { } // get the listeners for this set of values - Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String> listeners = - values2listeners.get(values); + Map<BiConsumer<String, StandardCoderObject>, String> listeners = values2listeners.get(values); if (listeners == null) { // no listeners for this particular list of values return; @@ -130,9 +127,9 @@ public class Forwarder { // forward the message to each listener - for (TriConsumer<CommInfrastructure, String, StandardCoderObject> listener : listeners.keySet()) { + for (BiConsumer<String, StandardCoderObject> listener : listeners.keySet()) { try { - listener.accept(infra, textMessage, scoMessage); + listener.accept(textMessage, scoMessage); } catch (RuntimeException e) { logger.warn("exception thrown by listener {}", Util.ident(listener), e); } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java index eb805ca5d..fcb463518 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java @@ -98,7 +98,7 @@ public class TopicListenerImpl implements TopicListener { * them all take a crack at it. */ for (Forwarder forwarder : selector2forwarder.values()) { - forwarder.onMessage(infra, message, object); + forwarder.onMessage(message, object); } } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java deleted file mode 100644 index c0cfe2571..000000000 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPair.java +++ /dev/null @@ -1,122 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.controlloop.actorserviceprovider.topic; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import lombok.Getter; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; -import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; -import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.event.comm.TopicSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A pair of topics, one of which is used to publish requests and the other to receive - * responses. - */ -public class TopicPair extends TopicListenerImpl { - private static final Logger logger = LoggerFactory.getLogger(TopicPair.class); - - @Getter - private final String source; - - @Getter - private final String target; - - private final List<TopicSink> publishers; - private final List<TopicSource> subscribers; - - /** - * Constructs the object. - * - * @param source source topic name - * @param target target topic name - */ - public TopicPair(String source, String target) { - this.source = source; - this.target = target; - - publishers = getTopicEndpointManager().getTopicSinks(target); - if (publishers.isEmpty()) { - throw new IllegalArgumentException("no sinks for topic: " + target); - } - - subscribers = getTopicEndpointManager().getTopicSources(Arrays.asList(source)); - if (subscribers.isEmpty()) { - throw new IllegalArgumentException("no sources for topic: " + source); - } - } - - /** - * Starts listening on the source topic(s). - */ - public void start() { - subscribers.forEach(topic -> topic.register(this)); - } - - /** - * Stops listening on the source topic(s). - */ - public void stop() { - subscribers.forEach(topic -> topic.unregister(this)); - } - - /** - * Stops listening on the source topic(s) and clears all of the forwarders. - */ - @Override - public void shutdown() { - stop(); - super.shutdown(); - } - - /** - * Publishes a message to the target topic. - * - * @param message message to be published - * @return a list of the infrastructures on which it was published - */ - public List<CommInfrastructure> publish(String message) { - List<CommInfrastructure> infrastructures = new ArrayList<>(publishers.size()); - - for (TopicSink topic : publishers) { - try { - topic.send(message); - infrastructures.add(topic.getTopicCommInfrastructure()); - - } catch (RuntimeException e) { - logger.warn("cannot publish to {}:{}", topic.getTopicCommInfrastructure(), target, e); - } - } - - return infrastructures; - } - - // these may be overridden by junit tests - - protected TopicEndpoint getTopicEndpointManager() { - return TopicEndpointManager.getManager(); - } -} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/ActorServiceTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/ActorServiceTest.java index 851a79129..efc7bb830 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/ActorServiceTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/ActorServiceTest.java @@ -65,7 +65,7 @@ public class ActorServiceTest { private Map<String, Object> sub2; private Map<String, Object> sub3; private Map<String, Object> sub4; - private Map<String, Object> params; + private Map<String, Map<String, Object>> params; private ActorService service; diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActorTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActorTest.java new file mode 100644 index 000000000..e1606aeaf --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicActorTest.java @@ -0,0 +1,242 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.impl; + +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import java.util.Map; +import java.util.Properties; +import java.util.TreeMap; +import java.util.function.Function; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicActorParams; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; +import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler; + +public class BidirectionalTopicActorTest { + + private static final String ACTOR = "my-actor"; + private static final String UNKNOWN = "unknown"; + private static final String MY_SINK = "my-sink"; + private static final String MY_SOURCE1 = "my-source-A"; + private static final String MY_SOURCE2 = "my-source-B"; + private static final int TIMEOUT = 10; + + @Mock + private BidirectionalTopicHandler handler1; + @Mock + private BidirectionalTopicHandler handler2; + + private BidirectionalTopicActor actor; + + + /** + * Configures the endpoints. + */ + @BeforeClass + public static void setUpBeforeClass() { + Properties props = new Properties(); + props.setProperty("noop.sink.topics", MY_SINK); + props.setProperty("noop.source.topics", MY_SOURCE1 + "," + MY_SOURCE2); + + // clear all topics and then configure one sink and two sources + TopicEndpointManager.getManager().shutdown(); + TopicEndpointManager.getManager().addTopicSinks(props); + TopicEndpointManager.getManager().addTopicSources(props); + } + + @AfterClass + public static void tearDownAfterClass() { + // clear all topics after the tests + TopicEndpointManager.getManager().shutdown(); + } + + /** + * Sets up. + */ + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + actor = new MyActor(); + actor.configure(Util.translateToMap(ACTOR, makeParams())); + } + + @Test + public void testDoStart() throws BidirectionalTopicClientException { + // allocate some handlers + actor.getTopicHandler(MY_SINK, MY_SOURCE1); + actor.getTopicHandler(MY_SINK, MY_SOURCE2); + + // start it + actor.start(); + + verify(handler1).start(); + verify(handler2).start(); + + verify(handler1, never()).stop(); + verify(handler2, never()).stop(); + + verify(handler1, never()).shutdown(); + verify(handler2, never()).shutdown(); + } + + @Test + public void testDoStop() throws BidirectionalTopicClientException { + // allocate some handlers + actor.getTopicHandler(MY_SINK, MY_SOURCE1); + actor.getTopicHandler(MY_SINK, MY_SOURCE2); + + // start it + actor.start(); + + // stop it + actor.stop(); + + verify(handler1).stop(); + verify(handler2).stop(); + + verify(handler1, never()).shutdown(); + verify(handler2, never()).shutdown(); + } + + @Test + public void testDoShutdown() { + // allocate some handlers + actor.getTopicHandler(MY_SINK, MY_SOURCE1); + actor.getTopicHandler(MY_SINK, MY_SOURCE2); + + // start it + actor.start(); + + // stop it + actor.shutdown(); + + verify(handler1).shutdown(); + verify(handler2).shutdown(); + + verify(handler1, never()).stop(); + verify(handler2, never()).stop(); + } + + @Test + public void testMakeOperatorParameters() { + BidirectionalTopicActorParams params = makeParams(); + + final BidirectionalTopicActor prov = new BidirectionalTopicActor(ACTOR); + Function<String, Map<String, Object>> maker = + prov.makeOperatorParameters(Util.translateToMap(prov.getName(), params)); + + assertNull(maker.apply(UNKNOWN)); + + // use a TreeMap to ensure the properties are sorted + assertEquals("{sinkTopic=my-sink, sourceTopic=my-source-A, timeoutSec=10}", + new TreeMap<>(maker.apply("operA")).toString()); + + assertEquals("{sinkTopic=my-sink, sourceTopic=topicB, timeoutSec=10}", + new TreeMap<>(maker.apply("operB")).toString()); + + // with invalid actor parameters + params.setOperation(null); + assertThatThrownBy(() -> prov.makeOperatorParameters(Util.translateToMap(prov.getName(), params))) + .isInstanceOf(ParameterValidationRuntimeException.class); + } + + @Test + public void testBidirectionalTopicActor() { + assertEquals(ACTOR, actor.getName()); + assertEquals(ACTOR, actor.getFullName()); + } + + @Test + public void testGetTopicHandler() { + assertSame(handler1, actor.getTopicHandler(MY_SINK, MY_SOURCE1)); + assertSame(handler2, actor.getTopicHandler(MY_SINK, MY_SOURCE2)); + + assertThatIllegalArgumentException().isThrownBy(() -> actor.getTopicHandler(UNKNOWN, MY_SOURCE1)); + } + + @Test + public void testMakeTopicHandler() { + // use a real actor + actor = new BidirectionalTopicActor(ACTOR); + + handler1 = actor.getTopicHandler(MY_SINK, MY_SOURCE1); + handler2 = actor.getTopicHandler(MY_SINK, MY_SOURCE2); + + assertNotNull(handler1); + assertNotNull(handler2); + assertNotSame(handler1, handler2); + } + + + private BidirectionalTopicActorParams makeParams() { + BidirectionalTopicActorParams params = new BidirectionalTopicActorParams(); + params.setSinkTopic(MY_SINK); + params.setSourceTopic(MY_SOURCE1); + params.setTimeoutSec(TIMEOUT); + + // @formatter:off + params.setOperation(Map.of( + "operA", Map.of(), + "operB", Map.of("sourceTopic", "topicB"))); + // @formatter:on + return params; + } + + private class MyActor extends BidirectionalTopicActor { + + public MyActor() { + super(ACTOR); + } + + @Override + protected BidirectionalTopicHandler makeTopicHandler(String sinkTopic, String sourceTopic) + throws BidirectionalTopicClientException { + + if (MY_SINK.equals(sinkTopic)) { + if (MY_SOURCE1.equals(sourceTopic)) { + return handler1; + } else if (MY_SOURCE2.equals(sourceTopic)) { + return handler2; + } + } + + throw new BidirectionalTopicClientException("no topic " + sinkTopic + "/" + sourceTopic); + } + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperationTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperationTest.java index 4e45b1abe..ceb63fe91 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperationTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperationTest.java @@ -20,7 +20,6 @@ package org.onap.policy.controlloop.actorserviceprovider.impl; -import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.assertj.core.api.Assertions.assertThatIllegalStateException; @@ -35,96 +34,64 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import ch.qos.logback.classic.Logger; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import lombok.Getter; import lombok.Setter; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer; import org.onap.policy.common.utils.coder.Coder; import org.onap.policy.common.utils.coder.CoderException; import org.onap.policy.common.utils.coder.StandardCoder; import org.onap.policy.common.utils.coder.StandardCoderObject; -import org.onap.policy.common.utils.test.log.logback.ExtractAppender; +import org.onap.policy.common.utils.time.PseudoExecutor; import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; -import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams; +import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler; import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder; -import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair; import org.onap.policy.controlloop.policy.PolicyResult; -import org.slf4j.LoggerFactory; -public class TopicPairOperationTest { - private static final List<CommInfrastructure> INFRA_LIST = - Arrays.asList(CommInfrastructure.NOOP, CommInfrastructure.UEB); +public class BidirectionalTopicOperationTest { + private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP; private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception"); private static final String ACTOR = "my-actor"; private static final String OPERATION = "my-operation"; private static final String REQ_ID = "my-request-id"; + private static final String MY_SINK = "my-sink"; private static final String MY_SOURCE = "my-source"; - private static final String MY_TARGET = "my-target"; private static final String TEXT = "some text"; private static final int TIMEOUT_SEC = 10; private static final long TIMEOUT_MS = 1000 * TIMEOUT_SEC; + private static final int MAX_REQUESTS = 100; private static final StandardCoder coder = new StandardCoder(); - /** - * Used to attach an appender to the class' logger. - */ - private static final Logger logger = (Logger) LoggerFactory.getLogger(TopicPairOperation.class); - private static final ExtractAppender appender = new ExtractAppender(); - @Mock - private TopicPairOperator operator; + private BidirectionalTopicOperator operator; @Mock - private TopicPair pair; + private BidirectionalTopicHandler handler; @Mock private Forwarder forwarder; @Captor - private ArgumentCaptor<TriConsumer<CommInfrastructure, String, StandardCoderObject>> listenerCaptor; + private ArgumentCaptor<BiConsumer<String, StandardCoderObject>> listenerCaptor; private ControlLoopOperationParams params; - private TopicPairParams topicParams; + private BidirectionalTopicParams topicParams; private OperationOutcome outcome; private StandardCoderObject stdResponse; private String responseText; - private MyExec executor; - private TopicPairOperation<MyRequest, MyResponse> oper; - - /** - * Attaches the appender to the logger. - */ - @BeforeClass - public static void setUpBeforeClass() throws Exception { - /** - * Attach appender to the logger. - */ - appender.setContext(logger.getLoggerContext()); - appender.start(); - - logger.addAppender(appender); - } - - /** - * Stops the appender. - */ - @AfterClass - public static void tearDownAfterClass() { - appender.stop(); - } + private PseudoExecutor executor; + private int ntimes; + private BidirectionalTopicOperation<MyRequest, MyResponse> oper; /** * Sets up. @@ -133,20 +100,20 @@ public class TopicPairOperationTest { public void setUp() throws CoderException { MockitoAnnotations.initMocks(this); - appender.clearExtractions(); - - topicParams = TopicPairParams.builder().source(MY_SOURCE).target(MY_TARGET).timeoutSec(TIMEOUT_SEC).build(); + topicParams = BidirectionalTopicParams.builder().sourceTopic(MY_SOURCE).sinkTopic(MY_SINK) + .timeoutSec(TIMEOUT_SEC).build(); when(operator.getActorName()).thenReturn(ACTOR); when(operator.getName()).thenReturn(OPERATION); - when(operator.getTopicPair()).thenReturn(pair); + when(operator.getTopicHandler()).thenReturn(handler); when(operator.getForwarder()).thenReturn(forwarder); when(operator.getParams()).thenReturn(topicParams); when(operator.isAlive()).thenReturn(true); - when(pair.publish(any())).thenReturn(INFRA_LIST); + when(handler.send(any())).thenReturn(true); + when(handler.getSinkTopicCommInfrastructure()).thenReturn(SINK_INFRA); - executor = new MyExec(100); + executor = new PseudoExecutor(); params = ControlLoopOperationParams.builder().actor(ACTOR).operation(OPERATION).executor(executor).build(); outcome = params.makeOutcome(); @@ -154,22 +121,28 @@ public class TopicPairOperationTest { responseText = coder.encode(new MyResponse()); stdResponse = coder.decode(responseText, StandardCoderObject.class); + ntimes = 1; + oper = new MyOperation(); } @Test - public void testTopicPairOperation_testGetTopicPair_testGetForwarder_testGetPairParams() { + public void testConstructor_testGetTopicHandler_testGetForwarder_testGetTopicParams() { assertEquals(ACTOR, oper.getActorName()); assertEquals(OPERATION, oper.getName()); - assertSame(pair, oper.getTopicPair()); + assertSame(handler, oper.getTopicHandler()); assertSame(forwarder, oper.getForwarder()); - assertSame(topicParams, oper.getPairParams()); + assertSame(topicParams, oper.getTopicParams()); assertEquals(TIMEOUT_MS, oper.getTimeoutMs()); assertSame(MyResponse.class, oper.getResponseClass()); } @Test public void testStartOperationAsync() throws Exception { + + // tell it to expect three responses + ntimes = 3; + CompletableFuture<OperationOutcome> future = oper.startOperationAsync(1, outcome); assertFalse(future.isDone()); @@ -177,17 +150,24 @@ public class TopicPairOperationTest { verify(forwarder, never()).unregister(any(), any()); - verify(pair).publish(any()); + verify(handler).send(any()); - // provide the response - listenerCaptor.getValue().accept(CommInfrastructure.NOOP, responseText, stdResponse); + // provide first response + listenerCaptor.getValue().accept(responseText, stdResponse); + assertTrue(executor.runAll(MAX_REQUESTS)); + assertFalse(future.isDone()); - // run the tasks - assertTrue(executor.runAll()); + // provide second response + listenerCaptor.getValue().accept(responseText, stdResponse); + assertTrue(executor.runAll(MAX_REQUESTS)); + assertFalse(future.isDone()); + // provide final response + listenerCaptor.getValue().accept(responseText, stdResponse); + assertTrue(executor.runAll(MAX_REQUESTS)); assertTrue(future.isDone()); - assertSame(outcome, future.get(5, TimeUnit.SECONDS)); + assertSame(outcome, future.get()); assertEquals(PolicyResult.SUCCESS, outcome.getResult()); verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue())); @@ -199,7 +179,7 @@ public class TopicPairOperationTest { @Test public void testStartOperationAsyncException() throws Exception { // indicate that nothing was published - when(pair.publish(any())).thenReturn(Arrays.asList()); + when(handler.send(any())).thenReturn(false); assertThatIllegalStateException().isThrownBy(() -> oper.startOperationAsync(1, outcome)); @@ -221,8 +201,7 @@ public class TopicPairOperationTest { @Test public void testPublishRequest() { - oper.publishRequest(new MyRequest()); - assertEquals(INFRA_LIST.size(), appender.getExtracted().size()); + assertThatCode(() -> oper.publishRequest(new MyRequest())).doesNotThrowAnyException(); } /** @@ -230,7 +209,7 @@ public class TopicPairOperationTest { */ @Test public void testPublishRequestUnpublished() { - when(pair.publish(any())).thenReturn(Arrays.asList()); + when(handler.send(any())).thenReturn(false); assertThatIllegalStateException().isThrownBy(() -> oper.publishRequest(new MyRequest())); } @@ -240,8 +219,7 @@ public class TopicPairOperationTest { @Test public void testPublishRequestString() { MyStringOperation oper2 = new MyStringOperation(); - oper2.publishRequest(TEXT); - assertEquals(INFRA_LIST.size(), appender.getExtracted().size()); + assertThatCode(() -> oper2.publishRequest(TEXT)).doesNotThrowAnyException(); } /** @@ -260,7 +238,7 @@ public class TopicPairOperationTest { public void testProcessResponseSuccessString() { MyStringOperation oper2 = new MyStringOperation(); - assertSame(outcome, oper2.processResponse(CommInfrastructure.NOOP, outcome, TEXT, null)); + assertSame(outcome, oper2.processResponse(outcome, TEXT, null)); assertEquals(PolicyResult.SUCCESS, outcome.getResult()); } @@ -272,7 +250,7 @@ public class TopicPairOperationTest { public void testProcessResponseSuccessSco() { MyScoOperation oper2 = new MyScoOperation(); - assertSame(outcome, oper2.processResponse(CommInfrastructure.NOOP, outcome, responseText, stdResponse)); + assertSame(outcome, oper2.processResponse(outcome, responseText, stdResponse)); assertEquals(PolicyResult.SUCCESS, outcome.getResult()); } @@ -288,7 +266,7 @@ public class TopicPairOperationTest { responseText = coder.encode(resp); stdResponse = coder.decode(responseText, StandardCoderObject.class); - assertSame(outcome, oper.processResponse(CommInfrastructure.NOOP, outcome, responseText, stdResponse)); + assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse)); assertEquals(PolicyResult.FAILURE, outcome.getResult()); } @@ -297,7 +275,7 @@ public class TopicPairOperationTest { */ @Test public void testProcessResponseDecodeOk() throws CoderException { - assertSame(outcome, oper.processResponse(CommInfrastructure.NOOP, outcome, responseText, stdResponse)); + assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse)); assertEquals(PolicyResult.SUCCESS, outcome.getResult()); } @@ -308,7 +286,7 @@ public class TopicPairOperationTest { public void testProcessResponseDecodeExcept() throws CoderException { // @formatter:off assertThatIllegalArgumentException().isThrownBy( - () -> oper.processResponse(CommInfrastructure.NOOP, outcome, "{invalid json", stdResponse)); + () -> oper.processResponse(outcome, "{invalid json", stdResponse)); // @formatter:on } @@ -318,88 +296,6 @@ public class TopicPairOperationTest { } @Test - public void testLogTopicRequest() { - // nothing to log - appender.clearExtractions(); - oper.logTopicRequest(Arrays.asList(), new MyRequest()); - assertEquals(0, appender.getExtracted().size()); - - // log structured data - appender.clearExtractions(); - oper.logTopicRequest(INFRA_LIST, new MyRequest()); - List<String> output = appender.getExtracted(); - assertEquals(2, output.size()); - - assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()) - .contains("{\n \"theRequestId\": \"my-request-id\"\n}"); - - assertThat(output.get(1)).contains(CommInfrastructure.UEB.toString()) - .contains("{\n \"theRequestId\": \"my-request-id\"\n}"); - - // log a plain string - appender.clearExtractions(); - new MyStringOperation().logTopicRequest(Arrays.asList(CommInfrastructure.NOOP), TEXT); - output = appender.getExtracted(); - assertEquals(1, output.size()); - assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains(TEXT); - - // log a null request - appender.clearExtractions(); - oper.logTopicRequest(Arrays.asList(CommInfrastructure.NOOP), null); - output = appender.getExtracted(); - assertEquals(1, output.size()); - - assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains("null"); - - // exception from coder - setOperCoderException(); - - appender.clearExtractions(); - oper.logTopicRequest(Arrays.asList(CommInfrastructure.NOOP), new MyRequest()); - output = appender.getExtracted(); - assertEquals(2, output.size()); - assertThat(output.get(0)).contains("cannot pretty-print request"); - assertThat(output.get(1)).contains(CommInfrastructure.NOOP.toString()); - } - - @Test - public void testLogTopicResponse() { - // log structured data - appender.clearExtractions(); - oper.logTopicResponse(CommInfrastructure.NOOP, new MyResponse()); - List<String> output = appender.getExtracted(); - assertEquals(1, output.size()); - - assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()) - .contains("{\n \"requestId\": \"my-request-id\"\n}"); - - // log a plain string - appender.clearExtractions(); - new MyStringOperation().logTopicResponse(CommInfrastructure.NOOP, TEXT); - output = appender.getExtracted(); - assertEquals(1, output.size()); - assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains(TEXT); - - // log a null response - appender.clearExtractions(); - oper.logTopicResponse(CommInfrastructure.NOOP, null); - output = appender.getExtracted(); - assertEquals(1, output.size()); - - assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains("null"); - - // exception from coder - setOperCoderException(); - - appender.clearExtractions(); - oper.logTopicResponse(CommInfrastructure.NOOP, new MyResponse()); - output = appender.getExtracted(); - assertEquals(2, output.size()); - assertThat(output.get(0)).contains("cannot pretty-print response"); - assertThat(output.get(1)).contains(CommInfrastructure.NOOP.toString()); - } - - @Test public void testMakeCoder() { assertNotNull(oper.makeCoder()); } @@ -436,9 +332,9 @@ public class TopicPairOperationTest { } - private class MyStringOperation extends TopicPairOperation<String, String> { + private class MyStringOperation extends BidirectionalTopicOperation<String, String> { public MyStringOperation() { - super(TopicPairOperationTest.this.params, operator, String.class); + super(BidirectionalTopicOperationTest.this.params, operator, String.class); } @Override @@ -452,15 +348,15 @@ public class TopicPairOperationTest { } @Override - protected boolean isSuccess(String rawResponse, String response) { - return (response != null); + protected Status detmStatus(String rawResponse, String response) { + return (response != null ? Status.SUCCESS : Status.FAILURE); } } - private class MyScoOperation extends TopicPairOperation<MyRequest, StandardCoderObject> { + private class MyScoOperation extends BidirectionalTopicOperation<MyRequest, StandardCoderObject> { public MyScoOperation() { - super(TopicPairOperationTest.this.params, operator, StandardCoderObject.class); + super(BidirectionalTopicOperationTest.this.params, operator, StandardCoderObject.class); } @Override @@ -474,15 +370,15 @@ public class TopicPairOperationTest { } @Override - protected boolean isSuccess(String rawResponse, StandardCoderObject response) { - return (response.getString("output") == null); + protected Status detmStatus(String rawResponse, StandardCoderObject response) { + return (response.getString("output") == null ? Status.SUCCESS : Status.FAILURE); } } - private class MyOperation extends TopicPairOperation<MyRequest, MyResponse> { + private class MyOperation extends BidirectionalTopicOperation<MyRequest, MyResponse> { public MyOperation() { - super(TopicPairOperationTest.this.params, operator, MyResponse.class); + super(BidirectionalTopicOperationTest.this.params, operator, MyResponse.class); } @Override @@ -496,8 +392,12 @@ public class TopicPairOperationTest { } @Override - protected boolean isSuccess(String rawResponse, MyResponse response) { - return (response.getOutput() == null); + protected Status detmStatus(String rawResponse, MyResponse response) { + if (--ntimes <= 0) { + return (response.getOutput() == null ? Status.SUCCESS : Status.FAILURE); + } + + return Status.STILL_WAITING; } } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperatorTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperatorTest.java index dd25902d6..4fae782bd 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/TopicPairOperatorTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperatorTest.java @@ -34,32 +34,32 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.onap.policy.controlloop.actorserviceprovider.Operation; import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; -import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams; +import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler; +import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicManager; import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder; import org.onap.policy.controlloop.actorserviceprovider.topic.SelectorKey; -import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair; -import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager; -public class TopicPairOperatorTest { +public class BidirectionalTopicOperatorTest { private static final String ACTOR = "my-actor"; private static final String OPERATION = "my-operation"; private static final String MY_SOURCE = "my-source"; - private static final String MY_TARGET = "my-target"; + private static final String MY_SINK = "my-target"; private static final int TIMEOUT_SEC = 10; @Mock - private TopicPairManager mgr; + private BidirectionalTopicManager mgr; @Mock - private TopicPair pair; + private BidirectionalTopicHandler handler; @Mock private Forwarder forwarder; @Mock - private TopicPairOperation<String, Integer> operation; + private BidirectionalTopicOperation<String, Integer> operation; private List<SelectorKey> keys; - private TopicPairParams params; + private BidirectionalTopicParams params; private MyOperator oper; /** @@ -71,22 +71,23 @@ public class TopicPairOperatorTest { keys = List.of(new SelectorKey("")); - when(mgr.getTopicPair(MY_SOURCE, MY_TARGET)).thenReturn(pair); - when(pair.addForwarder(keys)).thenReturn(forwarder); + when(mgr.getTopicHandler(MY_SINK, MY_SOURCE)).thenReturn(handler); + when(handler.addForwarder(keys)).thenReturn(forwarder); oper = new MyOperator(keys); - params = TopicPairParams.builder().source(MY_SOURCE).target(MY_TARGET).timeoutSec(TIMEOUT_SEC).build(); + params = BidirectionalTopicParams.builder().sourceTopic(MY_SOURCE).sinkTopic(MY_SINK).timeoutSec(TIMEOUT_SEC) + .build(); oper.configure(Util.translateToMap(OPERATION, params)); oper.start(); } @Test - public void testTopicPairOperator_testGetParams_testGetTopicPair_testGetForwarder() { + public void testConstructor_testGetParams_testGetTopicHandler_testGetForwarder() { assertEquals(ACTOR, oper.getActorName()); assertEquals(OPERATION, oper.getName()); assertEquals(params, oper.getParams()); - assertSame(pair, oper.getTopicPair()); + assertSame(handler, oper.getTopicHandler()); assertSame(forwarder, oper.getForwarder()); } @@ -95,7 +96,7 @@ public class TopicPairOperatorTest { oper.stop(); // invalid parameters - params.setSource(null); + params.setSourceTopic(null); assertThatThrownBy(() -> oper.configure(Util.translateToMap(OPERATION, params))) .isInstanceOf(ParameterValidationRuntimeException.class); } @@ -103,18 +104,20 @@ public class TopicPairOperatorTest { @Test public void testMakeOperator() { AtomicReference<ControlLoopOperationParams> paramsRef = new AtomicReference<>(); - AtomicReference<TopicPairOperator> operRef = new AtomicReference<>(); + AtomicReference<BidirectionalTopicOperator> operRef = new AtomicReference<>(); // @formatter:off - BiFunction<ControlLoopOperationParams, TopicPairOperator, TopicPairOperation<String, Integer>> maker = - (params, operator) -> { - paramsRef.set(params); - operRef.set(operator); - return operation; - }; + BiFunction<ControlLoopOperationParams, BidirectionalTopicOperator, + BidirectionalTopicOperation<String, Integer>> maker = + (params, operator) -> { + paramsRef.set(params); + operRef.set(operator); + return operation; + }; // @formatter:on - TopicPairOperator oper2 = TopicPairOperator.makeOperator(ACTOR, OPERATION, mgr, maker, new SelectorKey("")); + BidirectionalTopicOperator oper2 = + BidirectionalTopicOperator.makeOperator(ACTOR, OPERATION, mgr, maker, new SelectorKey("")); assertEquals(ACTOR, oper2.getActorName()); assertEquals(OPERATION, oper2.getName()); @@ -127,7 +130,7 @@ public class TopicPairOperatorTest { } - private class MyOperator extends TopicPairOperator { + private class MyOperator extends BidirectionalTopicOperator { public MyOperator(List<SelectorKey> selectorKeys) { super(ACTOR, OPERATION, mgr, selectorKeys); } diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActorTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActorTest.java index 8ce3b3230..80b1d427a 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActorTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpActorTest.java @@ -52,7 +52,12 @@ public class HttpActorTest { HttpActorParams params = new HttpActorParams(); params.setClientName(CLIENT); params.setTimeoutSec(TIMEOUT); - params.setPath(Map.of("operA", "urlA", "operB", "urlB")); + + // @formatter:off + params.setOperation(Map.of( + "operA", Map.of("path", "urlA"), + "operB", Map.of("path", "urlB"))); + // @formatter:on final HttpActor prov = new HttpActor(ACTOR); Function<String, Map<String, Object>> maker = @@ -68,7 +73,7 @@ public class HttpActorTest { new TreeMap<>(maker.apply("operB")).toString()); // with invalid actor parameters - params.setClientName(null); + params.setOperation(null); assertThatThrownBy(() -> prov.makeOperatorParameters(Util.translateToMap(prov.getName(), params))) .isInstanceOf(ParameterValidationRuntimeException.class); } diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperationTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperationTest.java index 39d6fd431..50cb8fa8f 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperationTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperationTest.java @@ -32,9 +32,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -import ch.qos.logback.classic.Logger; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Properties; import java.util.UUID; @@ -65,6 +63,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder; import org.onap.policy.common.endpoints.http.client.HttpClient; @@ -72,12 +71,10 @@ import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance; import org.onap.policy.common.endpoints.http.server.HttpServletServer; import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; import org.onap.policy.common.gson.GsonMessageBodyHandler; -import org.onap.policy.common.utils.coder.Coder; import org.onap.policy.common.utils.coder.CoderException; -import org.onap.policy.common.utils.coder.StandardCoder; import org.onap.policy.common.utils.network.NetworkUtil; -import org.onap.policy.common.utils.test.log.logback.ExtractAppender; import org.onap.policy.controlloop.VirtualControlLoopEvent; import org.onap.policy.controlloop.actorserviceprovider.Operation; import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; @@ -86,7 +83,6 @@ import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopE import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams; import org.onap.policy.controlloop.policy.PolicyResult; -import org.slf4j.LoggerFactory; public class HttpOperationTest { @@ -96,19 +92,12 @@ public class HttpOperationTest { private static final String HTTP_CLIENT = "my-client"; private static final String HTTP_NO_SERVER = "my-http-no-server-client"; private static final String MEDIA_TYPE_APPLICATION_JSON = "application/json"; - private static final String MY_REQUEST = "my-request"; private static final String BASE_URI = "oper"; private static final String PATH = "/my-path"; private static final String TEXT = "my-text"; private static final UUID REQ_ID = UUID.randomUUID(); /** - * Used to attach an appender to the class' logger. - */ - private static final Logger logger = (Logger) LoggerFactory.getLogger(HttpOperation.class); - private static final ExtractAppender appender = new ExtractAppender(); - - /** * {@code True} if the server should reject the request, {@code false} otherwise. */ private static boolean rejectRequest; @@ -164,14 +153,6 @@ public class HttpOperationTest { HttpClientFactoryInstance.getClientFactory() .build(builder.clientName(HTTP_NO_SERVER).port(NetworkUtil.allocPort()).build()); - - /** - * Attach appender to the logger. - */ - appender.setContext(logger.getLoggerContext()); - appender.start(); - - logger.addAppender(appender); } /** @@ -179,8 +160,6 @@ public class HttpOperationTest { */ @AfterClass public static void tearDownAfterClass() { - appender.stop(); - HttpClientFactoryInstance.getClientFactory().destroy(); HttpServletServerFactoryInstance.getServerFactory().destroy(); } @@ -193,8 +172,6 @@ public class HttpOperationTest { public void setUp() { MockitoAnnotations.initMocks(this); - appender.clearExtractions(); - rejectRequest = false; nget = 0; npost = 0; @@ -260,9 +237,9 @@ public class HttpOperationTest { @Test public void testDoConfigureMapOfStringObject_testGetClient_testGetPath_testGetTimeoutMs() { - // no default yet - assertEquals(0L, oper.getTimeoutMs(null)); - assertEquals(0L, oper.getTimeoutMs(0)); + // use value from operator + assertEquals(1000L, oper.getTimeoutMs(null)); + assertEquals(1000L, oper.getTimeoutMs(0)); // should use given value assertEquals(20 * 1000L, oper.getTimeoutMs(20)); @@ -442,96 +419,6 @@ public class HttpOperationTest { } @Test - public void testLogRestRequest() throws CoderException { - // log structured data - appender.clearExtractions(); - oper.logRestRequest(PATH, new MyRequest()); - List<String> output = appender.getExtracted(); - assertEquals(1, output.size()); - - assertThat(output.get(0)).contains(PATH).contains("{\n \"input\": \"some input\"\n}"); - - // log a plain string - appender.clearExtractions(); - oper.logRestRequest(PATH, MY_REQUEST); - output = appender.getExtracted(); - assertEquals(1, output.size()); - - assertThat(output.get(0)).contains(PATH).contains(MY_REQUEST); - - // log a null request - appender.clearExtractions(); - oper.logRestRequest(PATH, null); - output = appender.getExtracted(); - assertEquals(1, output.size()); - - // exception from coder - oper = new MyGetOperation<>(String.class) { - @Override - protected Coder makeCoder() { - return new StandardCoder() { - @Override - public String encode(Object object, boolean pretty) throws CoderException { - throw new CoderException(EXPECTED_EXCEPTION); - } - }; - } - }; - - appender.clearExtractions(); - oper.logRestRequest(PATH, new MyRequest()); - output = appender.getExtracted(); - assertEquals(2, output.size()); - assertThat(output.get(0)).contains("cannot pretty-print request"); - assertThat(output.get(1)).contains(PATH); - } - - @Test - public void testLogRestResponse() throws CoderException { - // log structured data - appender.clearExtractions(); - oper.logRestResponse(PATH, new MyResponse()); - List<String> output = appender.getExtracted(); - assertEquals(1, output.size()); - - assertThat(output.get(0)).contains(PATH).contains("{\n \"output\": \"some output\"\n}"); - - // log a plain string - appender.clearExtractions(); - oper.logRestResponse(PATH, MY_REQUEST); - output = appender.getExtracted(); - assertEquals(1, output.size()); - - // log a null response - appender.clearExtractions(); - oper.logRestResponse(PATH, null); - output = appender.getExtracted(); - assertEquals(1, output.size()); - - assertThat(output.get(0)).contains(PATH).contains("null"); - - // exception from coder - oper = new MyGetOperation<>(String.class) { - @Override - protected Coder makeCoder() { - return new StandardCoder() { - @Override - public String encode(Object object, boolean pretty) throws CoderException { - throw new CoderException(EXPECTED_EXCEPTION); - } - }; - } - }; - - appender.clearExtractions(); - oper.logRestResponse(PATH, new MyResponse()); - output = appender.getExtracted(); - assertEquals(2, output.size()); - assertThat(output.get(0)).contains("cannot pretty-print response"); - assertThat(output.get(1)).contains(PATH); - } - - @Test public void testMakeDecoder() { assertNotNull(oper.makeCoder()); } @@ -569,7 +456,7 @@ public class HttpOperationTest { private void initOper(HttpOperator operator, String clientName) { operator.stop(); - HttpParams params = HttpParams.builder().clientName(clientName).path(PATH).build(); + HttpParams params = HttpParams.builder().clientName(clientName).path(PATH).timeoutSec(1).build(); Map<String, Object> mapParams = Util.translateToMap(OPERATION, params); operator.configure(mapParams); operator.start(); @@ -614,7 +501,7 @@ public class HttpOperationTest { headers.put("Accept", MediaType.APPLICATION_JSON); String url = makeUrl(); - logRestRequest(url, null); + logMessage(EventType.OUT, CommInfrastructure.REST, url, null); // @formatter:off return handleResponse(outcome, url, @@ -640,7 +527,7 @@ public class HttpOperationTest { headers.put("Accept", MediaType.APPLICATION_JSON); String url = makeUrl(); - logRestRequest(url, request); + logMessage(EventType.OUT, CommInfrastructure.REST, url, request); // @formatter:off return handleResponse(outcome, url, @@ -666,7 +553,7 @@ public class HttpOperationTest { headers.put("Accept", MediaType.APPLICATION_JSON); String url = makeUrl(); - logRestRequest(url, request); + logMessage(EventType.OUT, CommInfrastructure.REST, url, request); // @formatter:off return handleResponse(outcome, url, @@ -687,7 +574,7 @@ public class HttpOperationTest { headers.put("Accept", MediaType.APPLICATION_JSON); String url = makeUrl(); - logRestRequest(url, null); + logMessage(EventType.OUT, CommInfrastructure.REST, url, null); // @formatter:off return handleResponse(outcome, url, diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/MyExec.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/MyExec.java deleted file mode 100644 index 6515eb37c..000000000 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/MyExec.java +++ /dev/null @@ -1,65 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.controlloop.actorserviceprovider.impl; - -import java.util.LinkedList; -import java.util.Queue; -import java.util.concurrent.Executor; - -/** - * Executor that will run tasks until the queue is empty or a maximum number of tasks have - * been executed. Doesn't actually run anything until {@link #runAll()} is invoked. - */ -public class MyExec implements Executor { - - // TODO move this to policy-common/utils-test - - private final int maxTasks; - private final Queue<Runnable> commands = new LinkedList<>(); - - public MyExec(int maxTasks) { - this.maxTasks = maxTasks; - } - - public int getQueueLength() { - return commands.size(); - } - - @Override - public void execute(Runnable command) { - commands.add(command); - } - - /** - * Runs all tasks until the queue is empty or the maximum number of tasks have been - * reached. - * - * @return {@code true} if the queue is empty, {@code false} if the maximum number of - * tasks have been reached before the queue was completed - */ - public boolean runAll() { - for (int count = 0; count < maxTasks && !commands.isEmpty(); ++count) { - commands.remove().run(); - } - - return commands.isEmpty(); - } -} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartialTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartialTest.java index f28c1f6c6..67ac27c8d 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartialTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartialTest.java @@ -20,8 +20,8 @@ package org.onap.policy.controlloop.actorserviceprovider.impl; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; -import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.assertj.core.api.Assertions.assertThatIllegalStateException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -30,6 +30,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import ch.qos.logback.classic.Logger; import java.time.Instant; import java.util.Arrays; import java.util.LinkedList; @@ -45,42 +46,59 @@ import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.Getter; import lombok.Setter; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; +import org.onap.policy.common.utils.coder.Coder; import org.onap.policy.common.utils.coder.CoderException; import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.common.utils.test.log.logback.ExtractAppender; +import org.onap.policy.common.utils.time.PseudoExecutor; import org.onap.policy.controlloop.ControlLoopOperation; import org.onap.policy.controlloop.VirtualControlLoopEvent; import org.onap.policy.controlloop.actorserviceprovider.Operation; import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; -import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; import org.onap.policy.controlloop.policy.PolicyResult; +import org.slf4j.LoggerFactory; public class OperationPartialTest { - private static final int MAX_PARALLEL_REQUESTS = 10; + private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP; + private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB; + private static final int MAX_REQUESTS = 100; + private static final int MAX_PARALLEL = 10; private static final String EXPECTED_EXCEPTION = "expected exception"; private static final String ACTOR = "my-actor"; private static final String OPERATION = "my-operation"; - private static final String TARGET = "my-target"; + private static final String MY_SINK = "my-sink"; + private static final String MY_SOURCE = "my-source"; + private static final String TEXT = "my-text"; private static final int TIMEOUT = 1000; private static final UUID REQ_ID = UUID.randomUUID(); private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream() .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList()); + /** + * Used to attach an appender to the class' logger. + */ + private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class); + private static final ExtractAppender appender = new ExtractAppender(); + private VirtualControlLoopEvent event; private ControlLoopEventContext context; - private MyExec executor; + private PseudoExecutor executor; private ControlLoopOperationParams params; private MyOper oper; @@ -96,6 +114,28 @@ public class OperationPartialTest { private OperatorPartial operator; /** + * Attaches the appender to the logger. + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + /** + * Attach appender to the logger. + */ + appender.setContext(logger.getLoggerContext()); + appender.start(); + + logger.addAppender(appender); + } + + /** + * Stops the appender. + */ + @AfterClass + public static void tearDownAfterClass() { + appender.stop(); + } + + /** * Initializes the fields, including {@link #oper}. */ @Before @@ -104,11 +144,11 @@ public class OperationPartialTest { event.setRequestId(REQ_ID); context = new ControlLoopEventContext(event); - executor = new MyExec(100 * MAX_PARALLEL_REQUESTS); + executor = new PseudoExecutor(); params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context) .executor(executor).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT) - .startCallback(this::starter).targetEntity(TARGET).build(); + .startCallback(this::starter).targetEntity(MY_SINK).build(); operator = new OperatorPartial(ACTOR, OPERATION) { @Override @@ -209,19 +249,19 @@ public class OperationPartialTest { */ @Test public void testStartMultiple() { - for (int count = 0; count < MAX_PARALLEL_REQUESTS; ++count) { + for (int count = 0; count < MAX_PARALLEL; ++count) { oper.start(); } - assertTrue(executor.runAll()); + assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL)); assertNotNull(opstart); assertNotNull(opend); assertEquals(PolicyResult.SUCCESS, opend.getResult()); - assertEquals(MAX_PARALLEL_REQUESTS, numStart); - assertEquals(MAX_PARALLEL_REQUESTS, oper.getCount()); - assertEquals(MAX_PARALLEL_REQUESTS, numEnd); + assertEquals(MAX_PARALLEL, numStart); + assertEquals(MAX_PARALLEL, oper.getCount()); + assertEquals(MAX_PARALLEL, numEnd); } /** @@ -254,7 +294,7 @@ public class OperationPartialTest { oper.setGuard(CompletableFuture.completedFuture(makeSuccess())); oper.start().cancel(false); - assertTrue(executor.runAll()); + assertTrue(executor.runAll(MAX_REQUESTS)); assertNull(opstart); assertNull(opend); @@ -295,7 +335,7 @@ public class OperationPartialTest { @Test public void testStartOperationAsync() { oper.start(); - assertTrue(executor.runAll()); + assertTrue(executor.runAll(MAX_REQUESTS)); assertEquals(1, oper.getCount()); } @@ -330,14 +370,14 @@ public class OperationPartialTest { outcome.setResult(PolicyResult.FAILURE); // incorrect actor - outcome.setActor(TARGET); + outcome.setActor(MY_SINK); assertFalse(oper.isActorFailed(outcome)); outcome.setActor(null); assertFalse(oper.isActorFailed(outcome)); outcome.setActor(ACTOR); // incorrect operation - outcome.setOperation(TARGET); + outcome.setOperation(MY_SINK); assertFalse(oper.isActorFailed(outcome)); outcome.setOperation(null); assertFalse(oper.isActorFailed(outcome)); @@ -355,7 +395,7 @@ public class OperationPartialTest { OperationPartial oper2 = new OperationPartial(params, operator) {}; oper2.start(); - assertTrue(executor.runAll()); + assertTrue(executor.runAll(MAX_REQUESTS)); assertNotNull(opend); assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult()); @@ -519,14 +559,14 @@ public class OperationPartialTest { // wrong actor - should be false outcome.setActor(null); assertFalse(oper.isSameOperation(outcome)); - outcome.setActor(TARGET); + outcome.setActor(MY_SINK); assertFalse(oper.isSameOperation(outcome)); outcome.setActor(ACTOR); // wrong operation - should be null outcome.setOperation(null); assertFalse(oper.isSameOperation(outcome)); - outcome.setOperation(TARGET); + outcome.setOperation(MY_SINK); assertFalse(oper.isSameOperation(outcome)); outcome.setOperation(OPERATION); @@ -584,43 +624,47 @@ public class OperationPartialTest { @Test public void testAnyOf() throws Exception { // first task completes, others do not - List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>(); + List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>(); final OperationOutcome outcome = params.makeOutcome(); - tasks.add(CompletableFuture.completedFuture(outcome)); - tasks.add(new CompletableFuture<>()); - tasks.add(new CompletableFuture<>()); + tasks.add(() -> CompletableFuture.completedFuture(outcome)); + tasks.add(() -> new CompletableFuture<>()); + tasks.add(() -> null); + tasks.add(() -> new CompletableFuture<>()); CompletableFuture<OperationOutcome> result = oper.anyOf(tasks); - assertTrue(executor.runAll()); + assertTrue(executor.runAll(MAX_REQUESTS)); + assertTrue(result.isDone()); + assertSame(outcome, result.get()); + // repeat using array form + @SuppressWarnings("unchecked") + Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()]; + result = oper.anyOf(tasks.toArray(taskArray)); + assertTrue(executor.runAll(MAX_REQUESTS)); assertTrue(result.isDone()); assertSame(outcome, result.get()); // second task completes, others do not - tasks = new LinkedList<>(); - - tasks.add(new CompletableFuture<>()); - tasks.add(CompletableFuture.completedFuture(outcome)); - tasks.add(new CompletableFuture<>()); + tasks.clear(); + tasks.add(() -> new CompletableFuture<>()); + tasks.add(() -> CompletableFuture.completedFuture(outcome)); + tasks.add(() -> new CompletableFuture<>()); result = oper.anyOf(tasks); - assertTrue(executor.runAll()); - + assertTrue(executor.runAll(MAX_REQUESTS)); assertTrue(result.isDone()); assertSame(outcome, result.get()); // third task completes, others do not - tasks = new LinkedList<>(); - - tasks.add(new CompletableFuture<>()); - tasks.add(new CompletableFuture<>()); - tasks.add(CompletableFuture.completedFuture(outcome)); + tasks.clear(); + tasks.add(() -> new CompletableFuture<>()); + tasks.add(() -> new CompletableFuture<>()); + tasks.add(() -> CompletableFuture.completedFuture(outcome)); result = oper.anyOf(tasks); - assertTrue(executor.runAll()); - + assertTrue(executor.runAll(MAX_REQUESTS)); assertTrue(result.isDone()); assertSame(outcome, result.get()); } @@ -631,54 +675,82 @@ public class OperationPartialTest { @Test @SuppressWarnings("unchecked") public void testAnyOfEdge() throws Exception { - List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>(); + List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>(); // zero items: check both using a list and using an array - assertThatIllegalArgumentException().isThrownBy(() -> oper.anyOf(tasks)); - assertThatIllegalArgumentException().isThrownBy(() -> oper.anyOf()); + assertNull(oper.anyOf(tasks)); + assertNull(oper.anyOf()); // one item: : check both using a list and using an array CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>(); - tasks.add(future1); + tasks.add(() -> future1); assertSame(future1, oper.anyOf(tasks)); - assertSame(future1, oper.anyOf(future1)); + assertSame(future1, oper.anyOf(() -> future1)); } - /** - * Tests both flavors of allOf(), because one invokes the other. - */ @Test - public void testAllOf() throws Exception { - List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>(); + public void testAllOfArray() throws Exception { + final OperationOutcome outcome = params.makeOutcome(); + CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>(); + CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>(); + CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>(); + + @SuppressWarnings("unchecked") + CompletableFuture<OperationOutcome> result = + oper.allOf(() -> future1, () -> future2, () -> null, () -> future3); + + assertTrue(executor.runAll(MAX_REQUESTS)); + assertFalse(result.isDone()); + future1.complete(outcome); + + // complete 3 before 2 + assertTrue(executor.runAll(MAX_REQUESTS)); + assertFalse(result.isDone()); + future3.complete(outcome); + + assertTrue(executor.runAll(MAX_REQUESTS)); + assertFalse(result.isDone()); + future2.complete(outcome); + + // all of them are now done + assertTrue(executor.runAll(MAX_REQUESTS)); + assertTrue(result.isDone()); + assertSame(outcome, result.get()); + } + + @Test + public void testAllOfList() throws Exception { final OperationOutcome outcome = params.makeOutcome(); CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>(); CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>(); CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>(); - tasks.add(future1); - tasks.add(future2); - tasks.add(future3); + List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>(); + tasks.add(() -> future1); + tasks.add(() -> future2); + tasks.add(() -> null); + tasks.add(() -> future3); CompletableFuture<OperationOutcome> result = oper.allOf(tasks); - assertTrue(executor.runAll()); + assertTrue(executor.runAll(MAX_REQUESTS)); assertFalse(result.isDone()); future1.complete(outcome); // complete 3 before 2 - assertTrue(executor.runAll()); + assertTrue(executor.runAll(MAX_REQUESTS)); assertFalse(result.isDone()); future3.complete(outcome); - assertTrue(executor.runAll()); + assertTrue(executor.runAll(MAX_REQUESTS)); assertFalse(result.isDone()); future2.complete(outcome); // all of them are now done - assertTrue(executor.runAll()); + assertTrue(executor.runAll(MAX_REQUESTS)); assertTrue(result.isDone()); assertSame(outcome, result.get()); } @@ -689,18 +761,41 @@ public class OperationPartialTest { @Test @SuppressWarnings("unchecked") public void testAllOfEdge() throws Exception { - List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>(); + List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>(); // zero items: check both using a list and using an array - assertThatIllegalArgumentException().isThrownBy(() -> oper.allOf(tasks)); - assertThatIllegalArgumentException().isThrownBy(() -> oper.allOf()); + assertNull(oper.allOf(tasks)); + assertNull(oper.allOf()); // one item: : check both using a list and using an array CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>(); - tasks.add(future1); + tasks.add(() -> future1); assertSame(future1, oper.allOf(tasks)); - assertSame(future1, oper.allOf(future1)); + assertSame(future1, oper.allOf(() -> future1)); + } + + @Test + public void testAttachFutures() throws Exception { + List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>(); + + // third task throws an exception during construction + CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>(); + CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>(); + CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>(); + tasks.add(() -> future1); + tasks.add(() -> future2); + tasks.add(() -> { + throw new IllegalStateException(EXPECTED_EXCEPTION); + }); + tasks.add(() -> future3); + + assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION); + + // should have canceled the first two, but not the last + assertTrue(future1.isCancelled()); + assertTrue(future2.isCancelled()); + assertFalse(future3.isCancelled()); } @Test @@ -714,12 +809,14 @@ public class OperationPartialTest { verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD); verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE); - // null outcome - final List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>(); - tasks.add(CompletableFuture.completedFuture(null)); + // null outcome - takes precedence over a success + List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>(); + tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome())); + tasks.add(() -> CompletableFuture.completedFuture(null)); + tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome())); CompletableFuture<OperationOutcome> result = oper.allOf(tasks); - assertTrue(executor.runAll()); + assertTrue(executor.runAll(MAX_REQUESTS)); assertTrue(result.isDone()); assertNull(result.get()); @@ -727,26 +824,85 @@ public class OperationPartialTest { IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION); tasks.clear(); - tasks.add(CompletableFuture.completedFuture(params.makeOutcome())); - tasks.add(CompletableFuture.failedFuture(except)); - tasks.add(CompletableFuture.completedFuture(params.makeOutcome())); + tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome())); + tasks.add(() -> CompletableFuture.failedFuture(except)); + tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome())); result = oper.allOf(tasks); - assertTrue(executor.runAll()); + assertTrue(executor.runAll(MAX_REQUESTS)); assertTrue(result.isCompletedExceptionally()); result.whenComplete((unused, thrown) -> assertSame(except, thrown)); } - private void verifyOutcomes(int expected, PolicyResult... results) throws Exception { - List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>(); + /** + * Tests both flavors of sequence(), because one invokes the other. + */ + @Test + public void testSequence() throws Exception { + final OperationOutcome outcome = params.makeOutcome(); + + List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>(); + tasks.add(() -> CompletableFuture.completedFuture(outcome)); + tasks.add(() -> null); + tasks.add(() -> CompletableFuture.completedFuture(outcome)); + tasks.add(() -> CompletableFuture.completedFuture(outcome)); + + CompletableFuture<OperationOutcome> result = oper.sequence(tasks); + assertTrue(executor.runAll(MAX_REQUESTS)); + assertTrue(result.isDone()); + assertSame(outcome, result.get()); + + // repeat using array form + @SuppressWarnings("unchecked") + Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()]; + result = oper.sequence(tasks.toArray(taskArray)); + assertTrue(executor.runAll(MAX_REQUESTS)); + assertTrue(result.isDone()); + assertSame(outcome, result.get()); + + // second task fails, third should not run + OperationOutcome failure = params.makeOutcome(); + failure.setResult(PolicyResult.FAILURE); + tasks.clear(); + tasks.add(() -> CompletableFuture.completedFuture(outcome)); + tasks.add(() -> CompletableFuture.completedFuture(failure)); + tasks.add(() -> CompletableFuture.completedFuture(outcome)); + + result = oper.sequence(tasks); + assertTrue(executor.runAll(MAX_REQUESTS)); + assertTrue(result.isDone()); + assertSame(failure, result.get()); + } + + /** + * Tests both flavors of sequence(), for edge cases: zero items, and one item. + */ + @Test + @SuppressWarnings("unchecked") + public void testSequenceEdge() throws Exception { + List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>(); + + // zero items: check both using a list and using an array + assertNull(oper.sequence(tasks)); + assertNull(oper.sequence()); + // one item: : check both using a list and using an array + CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>(); + tasks.add(() -> future1); + + assertSame(future1, oper.sequence(tasks)); + assertSame(future1, oper.sequence(() -> future1)); + } + + private void verifyOutcomes(int expected, PolicyResult... results) throws Exception { + List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>(); OperationOutcome expectedOutcome = null; for (int count = 0; count < results.length; ++count) { OperationOutcome outcome = params.makeOutcome(); outcome.setResult(results[count]); - tasks.add(CompletableFuture.completedFuture(outcome)); + tasks.add(() -> CompletableFuture.completedFuture(outcome)); if (count == expected) { expectedOutcome = outcome; @@ -755,17 +911,11 @@ public class OperationPartialTest { CompletableFuture<OperationOutcome> result = oper.allOf(tasks); - assertTrue(executor.runAll()); + assertTrue(executor.runAll(MAX_REQUESTS)); assertTrue(result.isDone()); assertSame(expectedOutcome, result.get()); } - private Function<OperationOutcome, CompletableFuture<OperationOutcome>> makeTask( - final OperationOutcome taskOutcome) { - - return outcome -> CompletableFuture.completedFuture(taskOutcome); - } - @Test public void testDetmPriority() throws CoderException { assertEquals(1, oper.detmPriority(null)); @@ -790,210 +940,6 @@ public class OperationPartialTest { } /** - * Tests doTask(Future) when the controller is not running. - */ - @Test - public void testDoTaskFutureNotRunning() throws Exception { - CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>(); - - PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - controller.complete(params.makeOutcome()); - - CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, params.makeOutcome(), taskFuture); - assertFalse(future.isDone()); - assertTrue(executor.runAll()); - - // should not have run the task - assertFalse(future.isDone()); - - // should have canceled the task future - assertTrue(taskFuture.isCancelled()); - } - - /** - * Tests doTask(Future) when the previous outcome was successful. - */ - @Test - public void testDoTaskFutureSuccess() throws Exception { - CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>(); - final OperationOutcome taskOutcome = params.makeOutcome(); - - PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - - CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, params.makeOutcome(), taskFuture); - - taskFuture.complete(taskOutcome); - assertTrue(executor.runAll()); - - assertTrue(future.isDone()); - assertSame(taskOutcome, future.get()); - - // controller should not be done yet - assertFalse(controller.isDone()); - } - - /** - * Tests doTask(Future) when the previous outcome was failed. - */ - @Test - public void testDoTaskFutureFailure() throws Exception { - CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>(); - final OperationOutcome failedOutcome = params.makeOutcome(); - failedOutcome.setResult(PolicyResult.FAILURE); - - PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - - CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, failedOutcome, taskFuture); - assertFalse(future.isDone()); - assertTrue(executor.runAll()); - - // should not have run the task - assertFalse(future.isDone()); - - // should have canceled the task future - assertTrue(taskFuture.isCancelled()); - - // controller SHOULD be done now - assertTrue(controller.isDone()); - assertSame(failedOutcome, controller.get()); - } - - /** - * Tests doTask(Future) when the previous outcome was failed, but not checking - * success. - */ - @Test - public void testDoTaskFutureUncheckedFailure() throws Exception { - CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>(); - final OperationOutcome failedOutcome = params.makeOutcome(); - failedOutcome.setResult(PolicyResult.FAILURE); - - PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - - CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, failedOutcome, taskFuture); - assertFalse(future.isDone()); - - // complete the task - OperationOutcome taskOutcome = params.makeOutcome(); - taskFuture.complete(taskOutcome); - - assertTrue(executor.runAll()); - - // should have run the task - assertTrue(future.isDone()); - - assertTrue(future.isDone()); - assertSame(taskOutcome, future.get()); - - // controller should not be done yet - assertFalse(controller.isDone()); - } - - /** - * Tests doTask(Function) when the controller is not running. - */ - @Test - public void testDoTaskFunctionNotRunning() throws Exception { - AtomicBoolean invoked = new AtomicBoolean(); - - Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> { - invoked.set(true); - return CompletableFuture.completedFuture(params.makeOutcome()); - }; - - PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - controller.complete(params.makeOutcome()); - - CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, task).apply(params.makeOutcome()); - assertFalse(future.isDone()); - assertTrue(executor.runAll()); - - // should not have run the task - assertFalse(future.isDone()); - - // should not have even invoked the task - assertFalse(invoked.get()); - } - - /** - * Tests doTask(Function) when the previous outcome was successful. - */ - @Test - public void testDoTaskFunctionSuccess() throws Exception { - final OperationOutcome taskOutcome = params.makeOutcome(); - - final OperationOutcome failedOutcome = params.makeOutcome(); - - Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome); - - PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - - CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, task).apply(failedOutcome); - - assertTrue(future.isDone()); - assertSame(taskOutcome, future.get()); - - // controller should not be done yet - assertFalse(controller.isDone()); - } - - /** - * Tests doTask(Function) when the previous outcome was failed. - */ - @Test - public void testDoTaskFunctionFailure() throws Exception { - final OperationOutcome failedOutcome = params.makeOutcome(); - failedOutcome.setResult(PolicyResult.FAILURE); - - AtomicBoolean invoked = new AtomicBoolean(); - - Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> { - invoked.set(true); - return CompletableFuture.completedFuture(params.makeOutcome()); - }; - - PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - - CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, task).apply(failedOutcome); - assertFalse(future.isDone()); - assertTrue(executor.runAll()); - - // should not have run the task - assertFalse(future.isDone()); - - // should not have even invoked the task - assertFalse(invoked.get()); - - // controller should have the failed task - assertTrue(controller.isDone()); - assertSame(failedOutcome, controller.get()); - } - - /** - * Tests doTask(Function) when the previous outcome was failed, but not checking - * success. - */ - @Test - public void testDoTaskFunctionUncheckedFailure() throws Exception { - final OperationOutcome taskOutcome = params.makeOutcome(); - - final OperationOutcome failedOutcome = params.makeOutcome(); - failedOutcome.setResult(PolicyResult.FAILURE); - - Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome); - - PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); - - CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, task).apply(failedOutcome); - - assertTrue(future.isDone()); - assertSame(taskOutcome, future.get()); - - // controller should not be done yet - assertFalse(controller.isDone()); - } - - /** * Tests callbackStarted() when the pipeline has already been stopped. */ @Test @@ -1013,7 +959,7 @@ public class OperationPartialTest { oper = new MyOper(); future.set(oper.start()); - assertTrue(executor.runAll()); + assertTrue(executor.runAll(MAX_REQUESTS)); // should have only run once assertEquals(1, numStart); @@ -1035,7 +981,7 @@ public class OperationPartialTest { oper = new MyOper(); future.set(oper.start()); - assertTrue(executor.runAll()); + assertTrue(executor.runAll(MAX_REQUESTS)); // should not have been set assertNull(opend); @@ -1091,6 +1037,62 @@ public class OperationPartialTest { } @Test + public void testLogMessage() { + final String infraStr = SINK_INFRA.toString(); + + // log structured data + appender.clearExtractions(); + oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData()); + List<String> output = appender.getExtracted(); + assertEquals(1, output.size()); + + assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT") + .contains("{\n \"text\": \"my-text\"\n}"); + + // repeat with a response + appender.clearExtractions(); + oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData()); + output = appender.getExtracted(); + assertEquals(1, output.size()); + + assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN") + .contains("{\n \"text\": \"my-text\"\n}"); + + // log a plain string + appender.clearExtractions(); + oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT); + output = appender.getExtracted(); + assertEquals(1, output.size()); + assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT); + + // log a null request + appender.clearExtractions(); + oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null); + output = appender.getExtracted(); + assertEquals(1, output.size()); + + assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null"); + + // generate exception from coder + setOperCoderException(); + + appender.clearExtractions(); + oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData()); + output = appender.getExtracted(); + assertEquals(2, output.size()); + assertThat(output.get(0)).contains("cannot pretty-print request"); + assertThat(output.get(1)).contains(infraStr).contains(MY_SINK); + + // repeat with a response + appender.clearExtractions(); + oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData()); + output = appender.getExtracted(); + assertEquals(2, output.size()); + assertThat(output.get(0)).contains("cannot pretty-print response"); + assertThat(output.get(1)).contains(MY_SOURCE); + } + + @Test public void testGetRetry() { assertEquals(0, oper.getRetry(null)); assertEquals(10, oper.getRetry(10)); @@ -1187,7 +1189,7 @@ public class OperationPartialTest { manipulator.accept(future); - assertTrue(testName, executor.runAll()); + assertTrue(testName, executor.runAll(MAX_REQUESTS)); assertEquals(testName, expectedCallbacks, numStart); assertEquals(testName, expectedCallbacks, numEnd); @@ -1216,6 +1218,30 @@ public class OperationPartialTest { assertEquals(testName, expectedOperations, oper.getCount()); } + /** + * Creates a new {@link #oper} whose coder will throw an exception. + */ + private void setOperCoderException() { + oper = new MyOper() { + @Override + protected Coder makeCoder() { + return new StandardCoder() { + @Override + public String encode(Object object, boolean pretty) throws CoderException { + throw new CoderException(EXPECTED_EXCEPTION); + } + }; + } + }; + } + + + @Getter + public static class MyData { + private String text = TEXT; + } + + private class MyOper extends OperationPartial { @Getter private int count = 0; diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParamsTest.java new file mode 100644 index 000000000..1f38ad371 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicActorParamsTest.java @@ -0,0 +1,118 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.parameters; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import java.util.Map; +import java.util.function.Consumer; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.controlloop.actorserviceprovider.Util; + +public class BidirectionalTopicActorParamsTest { + private static final String CONTAINER = "my-container"; + + private static final String DFLT_SOURCE = "default-source"; + private static final String DFLT_SINK = "default-target"; + private static final int DFLT_TIMEOUT = 10; + + private static final String OPER1_NAME = "oper A"; + private static final String OPER1_SOURCE = "source A"; + private static final String OPER1_SINK = "target A"; + private static final int OPER1_TIMEOUT = 20; + + // oper2 uses some default values + private static final String OPER2_NAME = "oper B"; + private static final String OPER2_SOURCE = "source B"; + + // oper3 uses default values for everything + private static final String OPER3_NAME = "oper C"; + + private Map<String, Map<String, Object>> operMap; + private BidirectionalTopicActorParams params; + + + /** + * Sets up. + */ + @Before + public void setUp() { + BidirectionalTopicParams oper1 = BidirectionalTopicParams.builder().sourceTopic(OPER1_SOURCE) + .sinkTopic(OPER1_SINK).timeoutSec(OPER1_TIMEOUT).build(); + + Map<String, Object> oper1Map = Util.translateToMap(OPER1_NAME, oper1); + Map<String, Object> oper2Map = Map.of("source", OPER2_SOURCE); + Map<String, Object> oper3Map = Collections.emptyMap(); + operMap = Map.of(OPER1_NAME, oper1Map, OPER2_NAME, oper2Map, OPER3_NAME, oper3Map); + + params = makeBidirectionalTopicActorParams(); + } + + @Test + public void testValidate() { + assertTrue(params.validate(CONTAINER).isValid()); + + // only a few fields are required + BidirectionalTopicActorParams sparse = Util.translate(CONTAINER, Map.of("operation", operMap, "timeoutSec", 1), + BidirectionalTopicActorParams.class); + assertTrue(sparse.validate(CONTAINER).isValid()); + + testValidateField("operation", "null", params2 -> params2.setOperation(null)); + testValidateField("timeoutSec", "minimum", params2 -> params2.setTimeoutSec(-1)); + + // check edge cases + params.setTimeoutSec(0); + assertFalse(params.validate(CONTAINER).isValid()); + + params.setTimeoutSec(1); + assertTrue(params.validate(CONTAINER).isValid()); + } + + private void testValidateField(String fieldName, String expected, + Consumer<BidirectionalTopicActorParams> makeInvalid) { + + // original params should be valid + ValidationResult result = params.validate(CONTAINER); + assertTrue(fieldName, result.isValid()); + + // make invalid params + BidirectionalTopicActorParams params2 = makeBidirectionalTopicActorParams(); + makeInvalid.accept(params2); + result = params2.validate(CONTAINER); + assertFalse(fieldName, result.isValid()); + assertThat(result.getResult()).contains(CONTAINER).contains(fieldName).contains(expected); + } + + private BidirectionalTopicActorParams makeBidirectionalTopicActorParams() { + BidirectionalTopicActorParams params2 = new BidirectionalTopicActorParams(); + params2.setSinkTopic(DFLT_SINK); + params2.setSourceTopic(DFLT_SOURCE); + params2.setTimeoutSec(DFLT_TIMEOUT); + params2.setOperation(operMap); + + return params2; + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicParamsTest.java index d63c833d1..7e44fa2e1 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairParamsTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/BidirectionalTopicParamsTest.java @@ -29,47 +29,46 @@ import java.util.function.Function; import org.junit.Before; import org.junit.Test; import org.onap.policy.common.parameters.ValidationResult; -import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams.TopicPairParamsBuilder; +import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams.BidirectionalTopicParamsBuilder; -public class TopicPairParamsTest { +public class BidirectionalTopicParamsTest { private static final String CONTAINER = "my-container"; - private static final String TARGET = "my-target"; + private static final String SINK = "my-sink"; private static final String SOURCE = "my-source"; private static final int TIMEOUT = 10; - private TopicPairParams params; + private BidirectionalTopicParams params; @Before public void setUp() { - params = TopicPairParams.builder().target(TARGET).source(SOURCE).timeoutSec(TIMEOUT).build(); + params = BidirectionalTopicParams.builder().sinkTopic(SINK).sourceTopic(SOURCE).timeoutSec(TIMEOUT).build(); } @Test public void testValidate() { - testValidateField("target", "null", bldr -> bldr.target(null)); - testValidateField("source", "null", bldr -> bldr.source(null)); + assertTrue(params.validate(CONTAINER).isValid()); + + testValidateField("sink", "null", bldr -> bldr.sinkTopic(null)); + testValidateField("source", "null", bldr -> bldr.sourceTopic(null)); testValidateField("timeoutSec", "minimum", bldr -> bldr.timeoutSec(-1)); // check edge cases assertFalse(params.toBuilder().timeoutSec(0).build().validate(CONTAINER).isValid()); assertTrue(params.toBuilder().timeoutSec(1).build().validate(CONTAINER).isValid()); - - // some default values should be valid - assertTrue(TopicPairParams.builder().target(TARGET).source(SOURCE).build().validate(CONTAINER).isValid()); } @Test public void testBuilder_testToBuilder() { - assertEquals(TARGET, params.getTarget()); - assertEquals(SOURCE, params.getSource()); + assertEquals(SINK, params.getSinkTopic()); + assertEquals(SOURCE, params.getSourceTopic()); assertEquals(TIMEOUT, params.getTimeoutSec()); assertEquals(params, params.toBuilder().build()); } private void testValidateField(String fieldName, String expected, - Function<TopicPairParamsBuilder, TopicPairParamsBuilder> makeInvalid) { + Function<BidirectionalTopicParamsBuilder, BidirectionalTopicParamsBuilder> makeInvalid) { // original params should be valid ValidationResult result = params.validate(CONTAINER); diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/CommonActorParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/CommonActorParamsTest.java new file mode 100644 index 000000000..901420346 --- /dev/null +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/CommonActorParamsTest.java @@ -0,0 +1,137 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actorserviceprovider.parameters; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.Map; +import java.util.TreeMap; +import java.util.function.Consumer; +import java.util.function.Function; +import lombok.Setter; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.controlloop.actorserviceprovider.Util; + +public class CommonActorParamsTest { + + private static final String CONTAINER = "my-container"; + + private static final String PATH1 = "path #1"; + private static final String PATH2 = "path #2"; + private static final String URI1 = "uri #1"; + private static final String URI2 = "uri #2"; + private static final String TEXT1 = "hello"; + private static final String TEXT2 = "world"; + private static final String TEXT2B = "bye"; + + private Map<String, Map<String, Object>> operations; + private CommonActorParams params; + + /** + * Initializes {@link #operations} with two items and {@link params} with a fully + * populated object. + */ + @Before + public void setUp() { + operations = new TreeMap<>(); + operations.put(PATH1, Map.of("path", URI1)); + operations.put(PATH2, Map.of("path", URI2, "text2", TEXT2B)); + + params = makeCommonActorParams(); + } + + @Test + public void testMakeOperationParameters() { + Function<String, Map<String, Object>> maker = params.makeOperationParameters(CONTAINER); + assertNull(maker.apply("unknown-operation")); + + Map<String, Object> subparam = maker.apply(PATH1); + assertNotNull(subparam); + assertEquals("{path=uri #1, text1=hello, text2=world}", new TreeMap<>(subparam).toString()); + + subparam = maker.apply(PATH2); + assertNotNull(subparam); + assertEquals("{path=uri #2, text1=hello, text2=bye}", new TreeMap<>(subparam).toString()); + } + + @Test + public void testDoValidation() { + assertThatCode(() -> params.doValidation(CONTAINER)).doesNotThrowAnyException(); + + // invalid param + params.setOperation(null); + assertThatThrownBy(() -> params.doValidation(CONTAINER)) + .isInstanceOf(ParameterValidationRuntimeException.class); + } + + @Test + public void testValidate() { + assertTrue(params.validate(CONTAINER).isValid()); + + // only a few fields are required + CommonActorParams sparse = Util.translate(CONTAINER, Map.of("operation", operations, "timeoutSec", 1), + CommonActorParams.class); + assertTrue(sparse.validate(CONTAINER).isValid()); + + testValidateField("operation", "null", params2 -> params2.setOperation(null)); + } + + private void testValidateField(String fieldName, String expected, Consumer<CommonActorParams> makeInvalid) { + + // original params should be valid + ValidationResult result = params.validate(CONTAINER); + assertTrue(fieldName, result.isValid()); + + // make invalid params + CommonActorParams params2 = makeCommonActorParams(); + makeInvalid.accept(params2); + result = params2.validate(CONTAINER); + assertFalse(fieldName, result.isValid()); + assertThat(result.getResult()).contains(CONTAINER).contains(fieldName).contains(expected); + } + + private CommonActorParams makeCommonActorParams() { + MyParams params2 = new MyParams(); + params2.setOperation(operations); + params2.setText1(TEXT1); + params2.setText2(TEXT2); + + return params2; + } + + @Setter + public static class MyParams extends CommonActorParams { + @SuppressWarnings("unused") + private String text1; + + @SuppressWarnings("unused") + private String text2; + } +} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParamsTest.java index daa0affec..9e708535f 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParamsTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpActorParamsTest.java @@ -21,21 +21,16 @@ package org.onap.policy.controlloop.actorserviceprovider.parameters; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatCode; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.util.Map; import java.util.TreeMap; import java.util.function.Consumer; -import java.util.function.Function; import org.junit.Before; import org.junit.Test; import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.controlloop.actorserviceprovider.Util; public class HttpActorParamsTest { @@ -48,63 +43,40 @@ public class HttpActorParamsTest { private static final String URI1 = "uri #1"; private static final String URI2 = "uri #2"; - private Map<String, String> paths; + private Map<String, Map<String, Object>> operations; private HttpActorParams params; /** - * Initializes {@link #paths} with two items and {@link params} with a fully populated - * object. + * Initializes {@link #operations} with two items and {@link params} with a fully + * populated object. */ @Before public void setUp() { - paths = new TreeMap<>(); - paths.put(PATH1, URI1); - paths.put(PATH2, URI2); + operations = new TreeMap<>(); + operations.put(PATH1, Map.of("path", URI1)); + operations.put(PATH2, Map.of("path", URI2)); params = makeHttpActorParams(); } @Test - public void testMakeOperationParameters() { - Function<String, Map<String, Object>> maker = params.makeOperationParameters(CONTAINER); - assertNull(maker.apply("unknown-operation")); - - Map<String, Object> subparam = maker.apply(PATH1); - assertNotNull(subparam); - assertEquals("{clientName=my-client, path=uri #1, timeoutSec=10}", new TreeMap<>(subparam).toString()); - - subparam = maker.apply(PATH2); - assertNotNull(subparam); - assertEquals("{clientName=my-client, path=uri #2, timeoutSec=10}", new TreeMap<>(subparam).toString()); - } - - @Test - public void testDoValidation() { - assertThatCode(() -> params.doValidation(CONTAINER)).doesNotThrowAnyException(); - - // invalid param - params.setClientName(null); - assertThatThrownBy(() -> params.doValidation(CONTAINER)) - .isInstanceOf(ParameterValidationRuntimeException.class); - } - - @Test public void testValidate() { assertTrue(params.validate(CONTAINER).isValid()); - testValidateField("clientName", "null", params2 -> params2.setClientName(null)); - testValidateField("path", "null", params2 -> params2.setPath(null)); + // only a few fields are required + HttpActorParams sparse = Util.translate(CONTAINER, Map.of("operation", operations, "timeoutSec", 1), + HttpActorParams.class); + assertTrue(sparse.validate(CONTAINER).isValid()); + + testValidateField("operation", "null", params2 -> params2.setOperation(null)); testValidateField("timeoutSec", "minimum", params2 -> params2.setTimeoutSec(-1)); // check edge cases params.setTimeoutSec(0); - assertTrue(params.validate(CONTAINER).isValid()); + assertFalse(params.validate(CONTAINER).isValid()); params.setTimeoutSec(1); assertTrue(params.validate(CONTAINER).isValid()); - - // one path value is null - testValidateField(PATH2, "null", params2 -> paths.put(PATH2, null)); } private void testValidateField(String fieldName, String expected, Consumer<HttpActorParams> makeInvalid) { @@ -125,7 +97,7 @@ public class HttpActorParamsTest { HttpActorParams params2 = new HttpActorParams(); params2.setClientName(CLIENT); params2.setTimeoutSec(TIMEOUT); - params2.setPath(paths); + params2.setOperation(operations); return params2; } diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParamsTest.java index ae4a79fe2..fdfb4b495 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParamsTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/HttpParamsTest.java @@ -54,7 +54,7 @@ public class HttpParamsTest { testValidateField("timeoutSec", "minimum", bldr -> bldr.timeoutSec(-1)); // check edge cases - assertTrue(params.toBuilder().timeoutSec(0).build().validate(CONTAINER).isValid()); + assertFalse(params.toBuilder().timeoutSec(0).build().validate(CONTAINER).isValid()); assertTrue(params.toBuilder().timeoutSec(1).build().validate(CONTAINER).isValid()); } diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParamsTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParamsTest.java deleted file mode 100644 index 4322c5f39..000000000 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/parameters/TopicPairActorParamsTest.java +++ /dev/null @@ -1,132 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.controlloop.actorserviceprovider.parameters; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; - -import java.util.Collections; -import java.util.Map; -import java.util.TreeMap; -import org.junit.Before; -import org.junit.Test; -import org.onap.policy.common.parameters.ValidationResult; -import org.onap.policy.controlloop.actorserviceprovider.Util; - -public class TopicPairActorParamsTest { - private static final String MY_NAME = "my-name"; - - private static final String DFLT_SOURCE = "default-source"; - private static final String DFLT_TARGET = "default-target"; - private static final int DFLT_TIMEOUT = 10; - - private static final String OPER1_NAME = "oper A"; - private static final String OPER1_SOURCE = "source A"; - private static final String OPER1_TARGET = "target A"; - private static final int OPER1_TIMEOUT = 20; - - // oper2 uses some default values - private static final String OPER2_NAME = "oper B"; - private static final String OPER2_SOURCE = "source B"; - - // oper3 uses default values for everything - private static final String OPER3_NAME = "oper C"; - - private TopicPairParams defaults; - private Map<String, Map<String, Object>> operMap; - private TopicPairActorParams params; - - - /** - * Sets up. - */ - @Before - public void setUp() { - defaults = TopicPairParams.builder().source(DFLT_SOURCE).target(DFLT_TARGET).timeoutSec(DFLT_TIMEOUT).build(); - - TopicPairParams oper1 = TopicPairParams.builder().source(OPER1_SOURCE).target(OPER1_TARGET) - .timeoutSec(OPER1_TIMEOUT).build(); - - Map<String, Object> oper1Map = Util.translateToMap(OPER1_NAME, oper1); - Map<String, Object> oper2Map = Map.of("source", OPER2_SOURCE); - Map<String, Object> oper3Map = Collections.emptyMap(); - operMap = Map.of(OPER1_NAME, oper1Map, OPER2_NAME, oper2Map, OPER3_NAME, oper3Map); - - params = TopicPairActorParams.builder().defaults(defaults).operation(operMap).build(); - - } - - @Test - public void testTopicPairActorParams() { - assertSame(defaults, params.getDefaults()); - assertSame(operMap, params.getOperation()); - } - - @Test - public void testDoValidation() { - assertSame(params, params.doValidation(MY_NAME)); - - // test with invalid parameters - defaults.setTimeoutSec(-1); - assertThatThrownBy(() -> params.doValidation(MY_NAME)).isInstanceOf(ParameterValidationRuntimeException.class); - } - - @Test - public void testValidate() { - ValidationResult result; - - // null defaults - params.setDefaults(null); - result = params.validate(MY_NAME); - assertFalse(result.isValid()); - assertThat(result.getResult()).contains("defaults").contains("null"); - params.setDefaults(defaults); - - // invalid value in defaults - defaults.setTimeoutSec(-1); - result = params.validate(MY_NAME); - assertFalse(result.isValid()); - assertThat(result.getResult()).contains("defaults").contains("timeoutSec"); - defaults.setTimeoutSec(DFLT_TIMEOUT); - - // null map - params.setOperation(null); - result = params.validate(MY_NAME); - assertFalse(result.isValid()); - assertThat(result.getResult()).contains("operation"); - params.setOperation(operMap); - - // null entry in the map - Map<String, Map<String, Object>> map2 = new TreeMap<>(operMap); - map2.put(OPER2_NAME, null); - params.setOperation(map2); - result = params.validate(MY_NAME); - assertFalse(result.isValid()); - assertThat(result.getResult()).contains("operation").contains("null"); - params.setOperation(operMap); - - // test success case - assertTrue(params.validate(MY_NAME).isValid()); - } -} diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicHandlerTest.java index c6557d0c9..54d56de53 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicPairTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/BidirectionalTopicHandlerTest.java @@ -20,18 +20,17 @@ package org.onap.policy.controlloop.actorserviceprovider.topic; -import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Arrays; -import java.util.List; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -40,114 +39,101 @@ import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException; -public class TopicPairTest { +public class BidirectionalTopicHandlerTest { private static final String UNKNOWN = "unknown"; - private static final String MY_SOURCE = "pair-source"; - private static final String MY_TARGET = "pair-target"; - private static final String TEXT = "some text"; + private static final String MY_SOURCE = "my-source"; + private static final String MY_SINK = "my-sink"; + private static final String KEY1 = "requestId"; + private static final String KEY2 = "subRequestId"; @Mock - private TopicSink publisher1; + private TopicSink publisher; @Mock - private TopicSink publisher2; - - @Mock - private TopicSource subscriber1; - - @Mock - private TopicSource subscriber2; + private TopicSource subscriber; @Mock private TopicEndpoint mgr; - private TopicPair pair; + private MyTopicHandler handler; /** * Sets up. */ @Before - public void setUp() { + public void setUp() throws BidirectionalTopicClientException { MockitoAnnotations.initMocks(this); - when(mgr.getTopicSinks(MY_TARGET)).thenReturn(Arrays.asList(publisher1, publisher2)); - when(mgr.getTopicSources(eq(Arrays.asList(MY_SOURCE)))).thenReturn(Arrays.asList(subscriber1, subscriber2)); + when(mgr.getTopicSinks(MY_SINK)).thenReturn(Arrays.asList(publisher)); + when(mgr.getTopicSources(eq(Arrays.asList(MY_SOURCE)))).thenReturn(Arrays.asList(subscriber)); - when(publisher1.getTopicCommInfrastructure()).thenReturn(CommInfrastructure.NOOP); - when(publisher2.getTopicCommInfrastructure()).thenReturn(CommInfrastructure.UEB); + when(publisher.getTopicCommInfrastructure()).thenReturn(CommInfrastructure.NOOP); - pair = new MyTopicPair(MY_SOURCE, MY_TARGET); + handler = new MyTopicHandler(MY_SINK, MY_SOURCE); - pair.start(); + handler.start(); } @Test - public void testTopicPair_testGetSource_testGetTarget() { - assertEquals(MY_SOURCE, pair.getSource()); - assertEquals(MY_TARGET, pair.getTarget()); + public void testBidirectionalTopicHandler_testGetSource_testGetTarget() { + assertEquals(MY_SOURCE, handler.getSourceTopic()); + assertEquals(MY_SINK, handler.getSinkTopic()); verify(mgr).getTopicSinks(anyString()); verify(mgr).getTopicSources(any()); // source not found - assertThatIllegalArgumentException().isThrownBy(() -> new MyTopicPair(UNKNOWN, MY_TARGET)) - .withMessageContaining("sources").withMessageContaining(UNKNOWN); + assertThatThrownBy(() -> new MyTopicHandler(MY_SINK, UNKNOWN)) + .isInstanceOf(BidirectionalTopicClientException.class).hasMessageContaining("sources") + .hasMessageContaining(UNKNOWN); // target not found - assertThatIllegalArgumentException().isThrownBy(() -> new MyTopicPair(MY_SOURCE, UNKNOWN)) - .withMessageContaining("sinks").withMessageContaining(UNKNOWN); + assertThatThrownBy(() -> new MyTopicHandler(UNKNOWN, MY_SOURCE)) + .isInstanceOf(BidirectionalTopicClientException.class).hasMessageContaining("sinks") + .hasMessageContaining(UNKNOWN); } @Test public void testShutdown() { - pair.shutdown(); - verify(subscriber1).unregister(pair); - verify(subscriber2).unregister(pair); + handler.shutdown(); + verify(subscriber).unregister(any()); } @Test public void testStart() { - verify(subscriber1).register(pair); - verify(subscriber2).register(pair); + verify(subscriber).register(any()); } @Test public void testStop() { - pair.stop(); - verify(subscriber1).unregister(pair); - verify(subscriber2).unregister(pair); + handler.stop(); + verify(subscriber).unregister(any()); } @Test - public void testPublish() { - List<CommInfrastructure> infrastructures = pair.publish(TEXT); - assertEquals(Arrays.asList(CommInfrastructure.NOOP, CommInfrastructure.UEB), infrastructures); - - verify(publisher1).send(TEXT); - verify(publisher2).send(TEXT); - - // first one throws an exception - should have only published to the second - when(publisher1.send(any())).thenThrow(new IllegalStateException("expected exception")); - - infrastructures = pair.publish(TEXT); - assertEquals(Arrays.asList(CommInfrastructure.UEB), infrastructures); + public void testAddForwarder() { + // array form + Forwarder forwarder = handler.addForwarder(new SelectorKey(KEY1), new SelectorKey(KEY2)); + assertNotNull(forwarder); - verify(publisher2, times(2)).send(TEXT); + // repeat using list form + assertSame(forwarder, handler.addForwarder(Arrays.asList(new SelectorKey(KEY1), new SelectorKey(KEY2)))); } @Test public void testGetTopicEndpointManager() { // setting "mgr" to null should cause it to use the superclass' method mgr = null; - assertNotNull(pair.getTopicEndpointManager()); + assertNotNull(handler.getTopicEndpointManager()); } - private class MyTopicPair extends TopicPair { - public MyTopicPair(String source, String target) { - super(source, target); + private class MyTopicHandler extends BidirectionalTopicHandler { + public MyTopicHandler(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException { + super(sinkTopic, sourceTopic); } @Override diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/ForwarderTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/ForwarderTest.java index 24f8b70a8..a01159bc2 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/ForwarderTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/ForwarderTest.java @@ -29,17 +29,15 @@ import static org.mockito.Mockito.verify; import java.util.Arrays; import java.util.Map; +import java.util.function.BiConsumer; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer; import org.onap.policy.common.utils.coder.StandardCoderObject; import org.onap.policy.controlloop.actorserviceprovider.Util; public class ForwarderTest { - private static final CommInfrastructure INFRA = CommInfrastructure.NOOP; private static final String TEXT = "some text"; private static final String KEY1 = "requestId"; @@ -58,16 +56,16 @@ public class ForwarderTest { private static final String VALUEC_SUBREQID = "bye-bye"; @Mock - private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener1; + private BiConsumer<String, StandardCoderObject> listener1; @Mock - private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener1b; + private BiConsumer<String, StandardCoderObject> listener1b; @Mock - private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener2; + private BiConsumer<String, StandardCoderObject> listener2; @Mock - private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener3; + private BiConsumer<String, StandardCoderObject> listener3; private Forwarder forwarder; @@ -102,68 +100,68 @@ public class ForwarderTest { forwarder.unregister(Arrays.asList(VALUEA_REQID, VALUEA_SUBREQID), listener1b); StandardCoderObject sco = makeMessage(Map.of(KEY1, VALUEA_REQID, KEY2, Map.of(SUBKEY, VALUEA_SUBREQID))); - forwarder.onMessage(INFRA, TEXT, sco); + forwarder.onMessage(TEXT, sco); - verify(listener1).accept(INFRA, TEXT, sco); - verify(listener1b, never()).accept(any(), any(), any()); + verify(listener1).accept(TEXT, sco); + verify(listener1b, never()).accept(any(), any()); // remove listener1 forwarder.unregister(Arrays.asList(VALUEA_REQID, VALUEA_SUBREQID), listener1); - forwarder.onMessage(INFRA, TEXT, sco); + forwarder.onMessage(TEXT, sco); // route a message to listener2 sco = makeMessage(Map.of(KEY1, VALUEB_REQID, KEY2, Map.of(SUBKEY, VALUEB_SUBREQID))); - forwarder.onMessage(INFRA, TEXT, sco); - verify(listener2).accept(INFRA, TEXT, sco); + forwarder.onMessage(TEXT, sco); + verify(listener2).accept(TEXT, sco); // no more messages to listener1 or 1b - verify(listener1).accept(any(), any(), any()); - verify(listener1b, never()).accept(any(), any(), any()); + verify(listener1).accept(any(), any()); + verify(listener1b, never()).accept(any(), any()); } @Test public void testOnMessage() { StandardCoderObject sco = makeMessage(Map.of(KEY1, VALUEA_REQID, KEY2, Map.of(SUBKEY, VALUEA_SUBREQID))); - forwarder.onMessage(INFRA, TEXT, sco); + forwarder.onMessage(TEXT, sco); - verify(listener1).accept(INFRA, TEXT, sco); - verify(listener1b).accept(INFRA, TEXT, sco); + verify(listener1).accept(TEXT, sco); + verify(listener1b).accept(TEXT, sco); // repeat - counts should increment - forwarder.onMessage(INFRA, TEXT, sco); + forwarder.onMessage(TEXT, sco); - verify(listener1, times(2)).accept(INFRA, TEXT, sco); - verify(listener1b, times(2)).accept(INFRA, TEXT, sco); + verify(listener1, times(2)).accept(TEXT, sco); + verify(listener1b, times(2)).accept(TEXT, sco); // should not have been invoked - verify(listener2, never()).accept(any(), any(), any()); - verify(listener3, never()).accept(any(), any(), any()); + verify(listener2, never()).accept(any(), any()); + verify(listener3, never()).accept(any(), any()); // try other listeners now sco = makeMessage(Map.of(KEY1, VALUEB_REQID, KEY2, Map.of(SUBKEY, VALUEB_SUBREQID))); - forwarder.onMessage(INFRA, TEXT, sco); - verify(listener2).accept(INFRA, TEXT, sco); + forwarder.onMessage(TEXT, sco); + verify(listener2).accept(TEXT, sco); sco = makeMessage(Map.of(KEY1, VALUEC_REQID, KEY2, Map.of(SUBKEY, VALUEC_SUBREQID))); - forwarder.onMessage(INFRA, TEXT, sco); - verify(listener3).accept(INFRA, TEXT, sco); + forwarder.onMessage(TEXT, sco); + verify(listener3).accept(TEXT, sco); // message has no listeners sco = makeMessage(Map.of(KEY1, "xyzzy", KEY2, Map.of(SUBKEY, VALUEB_SUBREQID))); - forwarder.onMessage(INFRA, TEXT, sco); + forwarder.onMessage(TEXT, sco); // message doesn't have both keys sco = makeMessage(Map.of(KEY1, VALUEA_REQID)); - forwarder.onMessage(INFRA, TEXT, sco); + forwarder.onMessage(TEXT, sco); // counts should not have incremented - verify(listener1, times(2)).accept(any(), any(), any()); - verify(listener1b, times(2)).accept(any(), any(), any()); - verify(listener2).accept(any(), any(), any()); - verify(listener3).accept(any(), any(), any()); + verify(listener1, times(2)).accept(any(), any()); + verify(listener1b, times(2)).accept(any(), any()); + verify(listener2).accept(any(), any()); + verify(listener3).accept(any(), any()); // listener throws an exception - doThrow(new IllegalStateException("expected exception")).when(listener1).accept(any(), any(), any()); + doThrow(new IllegalStateException("expected exception")).when(listener1).accept(any(), any()); } /* @@ -171,12 +169,12 @@ public class ForwarderTest { */ @Test public void testOnMessageListenerException1() { - doThrow(new IllegalStateException("expected exception")).when(listener1).accept(any(), any(), any()); + doThrow(new IllegalStateException("expected exception")).when(listener1).accept(any(), any()); StandardCoderObject sco = makeMessage(Map.of(KEY1, VALUEA_REQID, KEY2, Map.of(SUBKEY, VALUEA_SUBREQID))); - forwarder.onMessage(INFRA, TEXT, sco); + forwarder.onMessage(TEXT, sco); - verify(listener1b).accept(INFRA, TEXT, sco); + verify(listener1b).accept(TEXT, sco); } /* @@ -184,12 +182,12 @@ public class ForwarderTest { */ @Test public void testOnMessageListenerException1b() { - doThrow(new IllegalStateException("expected exception")).when(listener1b).accept(any(), any(), any()); + doThrow(new IllegalStateException("expected exception")).when(listener1b).accept(any(), any()); StandardCoderObject sco = makeMessage(Map.of(KEY1, VALUEA_REQID, KEY2, Map.of(SUBKEY, VALUEA_SUBREQID))); - forwarder.onMessage(INFRA, TEXT, sco); + forwarder.onMessage(TEXT, sco); - verify(listener1).accept(INFRA, TEXT, sco); + verify(listener1).accept(TEXT, sco); } /** diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImplTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImplTest.java index a37085799..3012ff6af 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImplTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImplTest.java @@ -30,12 +30,12 @@ import static org.mockito.Mockito.verify; import java.util.Arrays; import java.util.Map; +import java.util.function.BiConsumer; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer; import org.onap.policy.common.utils.coder.CoderException; import org.onap.policy.common.utils.coder.StandardCoder; import org.onap.policy.common.utils.coder.StandardCoderObject; @@ -58,13 +58,13 @@ public class TopicListenerImplTest { private TopicListenerImpl topic; @Mock - private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener1; + private BiConsumer<String, StandardCoderObject> listener1; @Mock - private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener1b; + private BiConsumer<String, StandardCoderObject> listener1b; @Mock - private TriConsumer<CommInfrastructure, String, StandardCoderObject> listener2; + private BiConsumer<String, StandardCoderObject> listener2; /** @@ -117,11 +117,11 @@ public class TopicListenerImplTest { String msg = makeMessage(Map.of(KEY1, VALUEA_REQID, KEY2, Map.of(SUBKEY, VALUEA_SUBREQID))); topic.onTopicEvent(INFRA, MY_TOPIC, msg); - verify(listener1).accept(eq(INFRA), eq(msg), any()); - verify(listener2).accept(eq(INFRA), eq(msg), any()); + verify(listener1).accept(eq(msg), any()); + verify(listener2).accept(eq(msg), any()); // not to listener1b - verify(listener1b, never()).accept(any(), any(), any()); + verify(listener1b, never()).accept(any(), any()); /* * now send a message that should only go to listener1b on forwarder1 @@ -130,15 +130,15 @@ public class TopicListenerImplTest { topic.onTopicEvent(INFRA, MY_TOPIC, msg); // should route to listener1 on forwarder1 and listener2 on forwarder2 - verify(listener1b).accept(eq(INFRA), eq(msg), any()); + verify(listener1b).accept(eq(msg), any()); // try one where the coder throws an exception topic.onTopicEvent(INFRA, MY_TOPIC, "{invalid-json"); // no extra invocations - verify(listener1).accept(any(), any(), any()); - verify(listener1b).accept(any(), any(), any()); - verify(listener2).accept(any(), any(), any()); + verify(listener1).accept(any(), any()); + verify(listener1b).accept(any(), any()); + verify(listener2).accept(any(), any()); } /** diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml b/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml index b11983bed..7b5b9fc32 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml +++ b/models-interactions/model-actors/actorServiceProvider/src/test/resources/logback-test.xml @@ -40,14 +40,9 @@ <appender-ref ref="STDOUT" /> </logger> - <!-- this is required for HttpOperationTest --> - <logger name="org.onap.policy.controlloop.actorserviceprovider.impl.HttpOperation" level="info" additivity="false"> - <appender-ref ref="STDOUT" /> - </logger> - - <!-- this is required for TopicPairOperationTest --> + <!-- this is required for OperationPartialTest --> <logger - name="org.onap.policy.controlloop.actorserviceprovider.impl.TopicPairOperation" + name="org.onap.policy.controlloop.actorserviceprovider.impl.OperationPartial" level="info" additivity="false"> <appender-ref ref="STDOUT" /> </logger> |