From 7d3f5bfd2e4fefe02c7f2fcf59981bb33f026419 Mon Sep 17 00:00:00 2001 From: "adheli.tavares" Date: Thu, 14 Dec 2023 10:05:49 +0000 Subject: Apply lower case to any topics to be compatible with Kafka. Issue-ID: POLICY-4402 Change-Id: Iebaec5f52a1fa0feb881ccfcb5319bc8a951b496 Signed-off-by: adheli.tavares --- .../src/main/resources/META-INF/persistence.xml | 1 + .../src/main/resources/META-INF/persistence.xml | 1 + .../common/endpoints/event/comm/TopicSink.java | 5 +- .../endpoints/event/comm/bus/BusTopicSink.java | 5 +- .../comm/bus/IndexedKafkaTopicSinkFactory.java | 9 ++-- .../comm/bus/IndexedKafkaTopicSourceFactory.java | 9 ++-- .../event/comm/bus/KafkaTopicSourceFactory.java | 16 +++--- .../event/comm/bus/internal/BusTopicParams.java | 27 ++++++++-- .../comm/bus/internal/InlineBusTopicSink.java | 23 +-------- .../comm/bus/internal/InlineKafkaTopicSink.java | 8 +-- .../internal/SingleThreadedKafkaTopicSource.java | 4 +- .../event/comm/bus/internal/TopicBase.java | 4 +- .../comm/client/BidirectionalTopicClient.java | 58 ++++++++++++---------- .../client/BidirectionalTopicClientException.java | 4 ++ .../event/comm/client/TopicSinkClient.java | 4 +- .../comm/client/TopicSinkClientException.java | 5 +- 16 files changed, 101 insertions(+), 82 deletions(-) diff --git a/integrity-audit/src/main/resources/META-INF/persistence.xml b/integrity-audit/src/main/resources/META-INF/persistence.xml index 3a7fdd7f..63fea7b1 100644 --- a/integrity-audit/src/main/resources/META-INF/persistence.xml +++ b/integrity-audit/src/main/resources/META-INF/persistence.xml @@ -37,6 +37,7 @@ + org.hibernate.jpa.HibernatePersistenceProvider org.onap.policy.common.ia.jpa.IntegrityAuditEntity NONE diff --git a/integrity-monitor/src/main/resources/META-INF/persistence.xml b/integrity-monitor/src/main/resources/META-INF/persistence.xml index 0adaae96..beff1432 100644 --- a/integrity-monitor/src/main/resources/META-INF/persistence.xml +++ b/integrity-monitor/src/main/resources/META-INF/persistence.xml @@ -25,6 +25,7 @@ xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence http://xmlns.jcp.org/xml/ns/persistence/persistence_2_1.xsd"> + org.hibernate.jpa.HibernatePersistenceProvider org.onap.policy.common.im.jpa.ImTestEntity org.onap.policy.common.im.jpa.StateManagementEntity org.onap.policy.common.im.jpa.ForwardProgressEntity diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicSink.java index f36dfa31..b67756e5 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicSink.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,9 +33,9 @@ public interface TopicSink extends Topic { * * @return true if the send operation succeeded, false otherwise * @throws IllegalArgumentException an invalid message has been provided - * @throws IllegalStateException the entity is in an state that prevents + * @throws IllegalStateException the entity is in a state that prevents * it from sending messages, for example, locked or stopped. */ - public boolean send(String message); + boolean send(String message); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java index e77beea1..5ca87732 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,12 +33,12 @@ public interface BusTopicSink extends ApiKeyEnabled, TopicSink { * * @param partitionKey the partition key */ - public void setPartitionKey(String partitionKey); + void setPartitionKey(String partitionKey); /** * Return the partition key in used by the system to publish messages. * * @return the partition key */ - public String getPartitionKey(); + String getPartitionKey(); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java index 23aaabd4..f913926e 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022 Nordix Foundation. + * Copyright (C) 2022-2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,7 +42,7 @@ class IndexedKafkaTopicSinkFactory implements KafkaTopicSinkFactory { /** * Logger. */ - private static Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSinkFactory.class); + private static final Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSinkFactory.class); /** * KAFKA Topic Name Index. @@ -98,7 +98,7 @@ class IndexedKafkaTopicSinkFactory implements KafkaTopicSinkFactory { List newKafkaTopicSinks = new ArrayList<>(); synchronized (this) { for (String topic : COMMA_SPACE_PAT.split(writeTopics)) { - addTopic(newKafkaTopicSinks, topic, properties); + addTopic(newKafkaTopicSinks, topic.toLowerCase(), properties); } return newKafkaTopicSinks; } @@ -113,7 +113,8 @@ class IndexedKafkaTopicSinkFactory implements KafkaTopicSinkFactory { String topicPrefix = PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic; var props = new PropertyUtils(properties, topicPrefix, - (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); + (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic sink {} ", + this, name, value, topic)); String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); if (StringUtils.isBlank(servers)) { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java index 1d586e43..151d8f69 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java @@ -42,7 +42,7 @@ class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory { /** * Logger. */ - private static Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSourceFactory.class); + private static final Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSourceFactory.class); /** * KAFKA Topic Name Index. @@ -84,7 +84,7 @@ class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory { List newKafkaTopicSources = new ArrayList<>(); synchronized (this) { for (String topic : COMMA_SPACE_PAT.split(readTopics)) { - addTopic(newKafkaTopicSources, topic, properties); + addTopic(newKafkaTopicSources, topic.toLowerCase(), properties); } } return newKafkaTopicSources; @@ -110,11 +110,12 @@ class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory { String topicPrefix = PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic; var props = new PropertyUtils(properties, topicPrefix, - (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); + (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic source {} ", + this, name, value, topic)); String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); if (StringUtils.isBlank(servers)) { - logger.error("{}: no KAFKA servers configured for sink {}", this, topic); + logger.error("{}: no KAFKA servers configured for source {}", this, topic); return; } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java index 8cb51df8..e5642daa 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022 Nordix Foundation. + * Copyright (C) 2022-2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,11 +28,11 @@ import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; public interface KafkaTopicSourceFactory { /** - * Creates an Kafka Topic Source based on properties files. + * Creates a Kafka Topic Source based on properties files. * * @param properties Properties containing initialization values * - * @return an Kafka Topic Source + * @return a Kafka Topic Source * @throws IllegalArgumentException if invalid parameters are present */ List build(Properties properties); @@ -41,7 +41,7 @@ public interface KafkaTopicSourceFactory { * Instantiates a new Kafka Topic Source. * * @param busTopicParams parameters object - * @return an Kafka Topic Source + * @return a Kafka Topic Source */ KafkaTopicSource build(BusTopicParams busTopicParams); @@ -51,13 +51,13 @@ public interface KafkaTopicSourceFactory { * @param servers list of servers * @param topic topic name * - * @return an Kafka Topic Source + * @return a Kafka Topic Source * @throws IllegalArgumentException if invalid parameters are present */ KafkaTopicSource build(List servers, String topic); /** - * Destroys an Kafka Topic Source based on a topic. + * Destroys a Kafka Topic Source based on a topic. * * @param topic topic name * @throws IllegalArgumentException if invalid parameters are present @@ -70,10 +70,10 @@ public interface KafkaTopicSourceFactory { void destroy(); /** - * Gets an Kafka Topic Source based on topic name. + * Gets a Kafka Topic Source based on topic name. * * @param topic the topic name - * @return an Kafka Topic Source with topic name + * @return a Kafka Topic Source with topic name * @throws IllegalArgumentException if an invalid topic is provided * @throws IllegalStateException if the Kafka Topic Source is an incorrect state */ diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java index f9537f52..d6fa21b6 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java @@ -4,7 +4,7 @@ * ================================================================================ * Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved. * Modifications Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019, 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -168,6 +168,26 @@ public class BusTopicParams { return additionalProps != null; } + public void setEffectiveTopic(String effectiveTopic) { + this.effectiveTopic = topicToLowerCase(effectiveTopic); + } + + public void setTopic(String topic) { + this.topic = topicToLowerCase(topic); + } + + public String getEffectiveTopic() { + return topicToLowerCase(effectiveTopic); + } + + public String getTopic() { + return topicToLowerCase(topic); + } + + private String topicToLowerCase(String topic) { + return (topic == null || topic.isEmpty()) ? topic : topic.toLowerCase(); + } + @NoArgsConstructor(access = AccessLevel.PRIVATE) public static class TopicParamsBuilder { @@ -179,12 +199,12 @@ public class BusTopicParams { } public TopicParamsBuilder topic(String topic) { - this.params.topic = topic; + this.params.setTopic(topic); return this; } public TopicParamsBuilder effectiveTopic(String effectiveTopic) { - this.params.effectiveTopic = effectiveTopic; + this.params.setEffectiveTopic(effectiveTopic); return this; } @@ -306,7 +326,6 @@ public class BusTopicParams { this.params.serializationProvider = serializationProvider; return this; } - } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java index 27ed5e7a..02626d34 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java @@ -5,6 +5,7 @@ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd. * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -158,28 +159,6 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi this.stop(); } - @Override - protected boolean anyNullOrEmpty(String... args) { - for (String arg : args) { - if (arg == null || arg.isEmpty()) { - return true; - } - } - - return false; - } - - @Override - protected boolean allNullOrEmpty(String... args) { - for (String arg : args) { - if (!(arg == null || arg.isEmpty())) { - return false; - } - } - - return true; - } - @Override public String toString() { return "InlineBusTopicSink [partitionId=" + partitionKey + ", alive=" + alive + ", publisher=" + publisher diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java index 6574d408..f605de92 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022 Nordix Foundation. + * Copyright (C) 2022-2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,12 +33,12 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop /** * Logger. */ - private static Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class); + private static final Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class); - protected Map additionalProps = null; + protected Map additionalProps; /** - * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains below mentioned + * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains the below * attributes. * *

servers list of KAFKA servers available for publishing diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java index 2a651ee7..713b4fd1 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022 Nordix Foundation. + * Copyright (C) 2022-2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,7 @@ import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource; /** - * This topic source implementation specializes in reading messages over an Kafka Bus topic source and + * This topic source implementation specializes in reading messages over a Kafka Bus topic source and * notifying its listeners. */ public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource implements KafkaTopicSource { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java index 3372e0aa..c63fbcc2 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java @@ -117,8 +117,8 @@ public abstract class TopicBase implements Topic { } this.servers = servers; - this.topic = topic; - this.effectiveTopic = effectiveTopicCopy; + this.topic = topic.toLowerCase(); + this.effectiveTopic = effectiveTopicCopy.toLowerCase(); } @Override diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java index 2a9f1446..b5d53909 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java @@ -3,6 +3,7 @@ * ONAP * ================================================================================ * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,11 +22,13 @@ package org.onap.policy.common.endpoints.event.comm.client; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import lombok.Getter; +import org.jetbrains.annotations.NotNull; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; @@ -55,9 +58,9 @@ public class BidirectionalTopicClient { private final CommInfrastructure sourceTopicCommInfrastructure; /** - * Used when checking whether or not a message sent on the sink topic can be received + * Used when checking whether a message sent on the sink topic can be received * on the source topic. When a matching message is received on the incoming topic, - * {@code true} is placed on the queue. If {@link #stop()} is called or the waiting + * {@code true} is placed on the queue. If {@link #stopWaiting()} is called or the waiting * thread is interrupted, then {@code false} is placed on the queue. Whenever a value * is pulled from the queue, it is immediately placed back on the queue. */ @@ -72,8 +75,8 @@ public class BidirectionalTopicClient { * @throws BidirectionalTopicClientException if either topic does not exist */ public BidirectionalTopicClient(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException { - this.sinkTopic = sinkTopic; - this.sourceTopic = sourceTopic; + this.sinkTopic = sinkTopic.toLowerCase(); + this.sourceTopic = sourceTopic.toLowerCase(); // init sinkClient List sinks = getTopicEndpointManager().getTopicSinks(sinkTopic); @@ -86,7 +89,7 @@ public class BidirectionalTopicClient { this.sink = sinks.get(0); // init source - List sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic)); + List sources = getTopicEndpointManager().getTopicSources(Collections.singletonList(sourceTopic)); if (sources.isEmpty()) { throw new BidirectionalTopicClientException("no sources for topic: " + sourceTopic); } else if (sources.size() > 1) { @@ -116,7 +119,7 @@ public class BidirectionalTopicClient { } /** - * Determines whether or not the topic is ready (i.e., {@link #awaitReady(Object)} has + * Determines whether the topic is ready (i.e., {@link #awaitReady(Object, long)} has * previously returned {@code true}). * * @return {@code true}, if the topic is ready to send and receive @@ -129,7 +132,7 @@ public class BidirectionalTopicClient { * Waits for the bidirectional topic to become "ready" by publishing a message on the * sink topic and awaiting receipt of the message on the source topic. If the message * is not received within a few seconds, then it tries again. This process is - * continued until the message is received, {@link #stop()} is called, or this thread + * continued until the message is received, {@link #stopWaiting()} is called, or this thread * is interrupted. Once this returns, subsequent calls will return immediately, always * with the same value. * @@ -150,24 +153,7 @@ public class BidirectionalTopicClient { final String messageText = coder.encode(message); // class of message to be decoded - @SuppressWarnings("unchecked") - final Class clazz = (Class) message.getClass(); - - // create a listener to detect when a matching message is received - final TopicListener listener = (infra, topic, msg) -> { - try { - T incoming = decode(msg, clazz); - - if (message.equals(incoming)) { - logger.info("topic {} is ready; found matching message {}", topic, incoming); - checkerQueue.add(Boolean.TRUE); - } - - } catch (CoderException e) { - logger.warn("cannot decode message from topic {}", topic, e); - decodeFailed(); - } - }; + final TopicListener listener = getTopicListener(message); source.register(listener); @@ -193,6 +179,28 @@ public class BidirectionalTopicClient { return checkerQueue.peek(); } + @NotNull + private TopicListener getTopicListener(T message) { + @SuppressWarnings("unchecked") + final Class clazz = (Class) message.getClass(); + + // create a listener to detect when a matching message is received + return (infra, topic, msg) -> { + try { + T incoming = decode(msg, clazz); + + if (message.equals(incoming)) { + logger.info("topic {} is ready; found matching message {}", topic, incoming); + checkerQueue.add(Boolean.TRUE); + } + + } catch (CoderException e) { + logger.warn("cannot decode message from topic {}", topic, e); + decodeFailed(); + } + }; + } + /** * Stops any listeners that are currently stuck in {@link #awaitReady(Object)} by * adding {@code false} to the queue. diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientException.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientException.java index 3a5d727b..1037d3af 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientException.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientException.java @@ -3,6 +3,7 @@ * ONAP * ================================================================================ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,10 +21,13 @@ package org.onap.policy.common.endpoints.event.comm.client; +import java.io.Serial; + /** * Exception thrown by BidirectionalTopicClient class. */ public class BidirectionalTopicClientException extends Exception { + @Serial private static final long serialVersionUID = 1L; public BidirectionalTopicClientException() { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java index 9f8b3c06..5f49ea34 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java @@ -56,9 +56,9 @@ public class TopicSinkClient { * @throws TopicSinkClientException if the topic does not exist */ public TopicSinkClient(final String topic) throws TopicSinkClientException { - final List lst = getTopicSinks(topic); + final List lst = getTopicSinks(topic.toLowerCase()); if (lst.isEmpty()) { - throw new TopicSinkClientException("no sinks for topic: " + topic); + throw new TopicSinkClientException("no sinks for topic: " + topic.toLowerCase()); } this.sink = lst.get(0); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientException.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientException.java index 608393b3..431d4f34 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientException.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientException.java @@ -3,7 +3,7 @@ * ONAP PAP * ================================================================================ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019, 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,10 +21,13 @@ package org.onap.policy.common.endpoints.event.comm.client; +import java.io.Serial; + /** * Exception thrown by TopicSink client classes. */ public class TopicSinkClientException extends Exception { + @Serial private static final long serialVersionUID = 1L; public TopicSinkClientException() { -- cgit 1.2.3-korg