diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus')
9 files changed, 53 insertions, 52 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java index e77beea1..5ca87732 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,12 +33,12 @@ public interface BusTopicSink extends ApiKeyEnabled, TopicSink { * * @param partitionKey the partition key */ - public void setPartitionKey(String partitionKey); + void setPartitionKey(String partitionKey); /** * Return the partition key in used by the system to publish messages. * * @return the partition key */ - public String getPartitionKey(); + String getPartitionKey(); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java index 23aaabd4..f913926e 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022 Nordix Foundation. + * Copyright (C) 2022-2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,7 +42,7 @@ class IndexedKafkaTopicSinkFactory implements KafkaTopicSinkFactory { /** * Logger. */ - private static Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSinkFactory.class); + private static final Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSinkFactory.class); /** * KAFKA Topic Name Index. @@ -98,7 +98,7 @@ class IndexedKafkaTopicSinkFactory implements KafkaTopicSinkFactory { List<KafkaTopicSink> newKafkaTopicSinks = new ArrayList<>(); synchronized (this) { for (String topic : COMMA_SPACE_PAT.split(writeTopics)) { - addTopic(newKafkaTopicSinks, topic, properties); + addTopic(newKafkaTopicSinks, topic.toLowerCase(), properties); } return newKafkaTopicSinks; } @@ -113,7 +113,8 @@ class IndexedKafkaTopicSinkFactory implements KafkaTopicSinkFactory { String topicPrefix = PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic; var props = new PropertyUtils(properties, topicPrefix, - (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); + (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic sink {} ", + this, name, value, topic)); String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); if (StringUtils.isBlank(servers)) { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java index 1d586e43..151d8f69 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java @@ -42,7 +42,7 @@ class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory { /** * Logger. */ - private static Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSourceFactory.class); + private static final Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSourceFactory.class); /** * KAFKA Topic Name Index. @@ -84,7 +84,7 @@ class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory { List<KafkaTopicSource> newKafkaTopicSources = new ArrayList<>(); synchronized (this) { for (String topic : COMMA_SPACE_PAT.split(readTopics)) { - addTopic(newKafkaTopicSources, topic, properties); + addTopic(newKafkaTopicSources, topic.toLowerCase(), properties); } } return newKafkaTopicSources; @@ -110,11 +110,12 @@ class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory { String topicPrefix = PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic; var props = new PropertyUtils(properties, topicPrefix, - (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); + (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic source {} ", + this, name, value, topic)); String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); if (StringUtils.isBlank(servers)) { - logger.error("{}: no KAFKA servers configured for sink {}", this, topic); + logger.error("{}: no KAFKA servers configured for source {}", this, topic); return; } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java index 8cb51df8..e5642daa 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022 Nordix Foundation. + * Copyright (C) 2022-2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,11 +28,11 @@ import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; public interface KafkaTopicSourceFactory { /** - * Creates an Kafka Topic Source based on properties files. + * Creates a Kafka Topic Source based on properties files. * * @param properties Properties containing initialization values * - * @return an Kafka Topic Source + * @return a Kafka Topic Source * @throws IllegalArgumentException if invalid parameters are present */ List<KafkaTopicSource> build(Properties properties); @@ -41,7 +41,7 @@ public interface KafkaTopicSourceFactory { * Instantiates a new Kafka Topic Source. * * @param busTopicParams parameters object - * @return an Kafka Topic Source + * @return a Kafka Topic Source */ KafkaTopicSource build(BusTopicParams busTopicParams); @@ -51,13 +51,13 @@ public interface KafkaTopicSourceFactory { * @param servers list of servers * @param topic topic name * - * @return an Kafka Topic Source + * @return a Kafka Topic Source * @throws IllegalArgumentException if invalid parameters are present */ KafkaTopicSource build(List<String> servers, String topic); /** - * Destroys an Kafka Topic Source based on a topic. + * Destroys a Kafka Topic Source based on a topic. * * @param topic topic name * @throws IllegalArgumentException if invalid parameters are present @@ -70,10 +70,10 @@ public interface KafkaTopicSourceFactory { void destroy(); /** - * Gets an Kafka Topic Source based on topic name. + * Gets a Kafka Topic Source based on topic name. * * @param topic the topic name - * @return an Kafka Topic Source with topic name + * @return a Kafka Topic Source with topic name * @throws IllegalArgumentException if an invalid topic is provided * @throws IllegalStateException if the Kafka Topic Source is an incorrect state */ diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java index f9537f52..d6fa21b6 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java @@ -4,7 +4,7 @@ * ================================================================================ * Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved. * Modifications Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019, 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -168,6 +168,26 @@ public class BusTopicParams { return additionalProps != null; } + public void setEffectiveTopic(String effectiveTopic) { + this.effectiveTopic = topicToLowerCase(effectiveTopic); + } + + public void setTopic(String topic) { + this.topic = topicToLowerCase(topic); + } + + public String getEffectiveTopic() { + return topicToLowerCase(effectiveTopic); + } + + public String getTopic() { + return topicToLowerCase(topic); + } + + private String topicToLowerCase(String topic) { + return (topic == null || topic.isEmpty()) ? topic : topic.toLowerCase(); + } + @NoArgsConstructor(access = AccessLevel.PRIVATE) public static class TopicParamsBuilder { @@ -179,12 +199,12 @@ public class BusTopicParams { } public TopicParamsBuilder topic(String topic) { - this.params.topic = topic; + this.params.setTopic(topic); return this; } public TopicParamsBuilder effectiveTopic(String effectiveTopic) { - this.params.effectiveTopic = effectiveTopic; + this.params.setEffectiveTopic(effectiveTopic); return this; } @@ -306,7 +326,6 @@ public class BusTopicParams { this.params.serializationProvider = serializationProvider; return this; } - } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java index 27ed5e7a..02626d34 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java @@ -5,6 +5,7 @@ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd. * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -159,28 +160,6 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi } @Override - protected boolean anyNullOrEmpty(String... args) { - for (String arg : args) { - if (arg == null || arg.isEmpty()) { - return true; - } - } - - return false; - } - - @Override - protected boolean allNullOrEmpty(String... args) { - for (String arg : args) { - if (!(arg == null || arg.isEmpty())) { - return false; - } - } - - return true; - } - - @Override public String toString() { return "InlineBusTopicSink [partitionId=" + partitionKey + ", alive=" + alive + ", publisher=" + publisher + "]"; diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java index 6574d408..f605de92 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022 Nordix Foundation. + * Copyright (C) 2022-2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,12 +33,12 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop /** * Logger. */ - private static Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class); + private static final Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class); - protected Map<String, String> additionalProps = null; + protected Map<String, String> additionalProps; /** - * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains below mentioned + * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains the below * attributes. * * <p>servers list of KAFKA servers available for publishing diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java index 2a651ee7..713b4fd1 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022 Nordix Foundation. + * Copyright (C) 2022-2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,7 @@ import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource; /** - * This topic source implementation specializes in reading messages over an Kafka Bus topic source and + * This topic source implementation specializes in reading messages over a Kafka Bus topic source and * notifying its listeners. */ public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource implements KafkaTopicSource { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java index 3372e0aa..c63fbcc2 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java @@ -117,8 +117,8 @@ public abstract class TopicBase implements Topic { } this.servers = servers; - this.topic = topic; - this.effectiveTopic = effectiveTopicCopy; + this.topic = topic.toLowerCase(); + this.effectiveTopic = effectiveTopicCopy.toLowerCase(); } @Override |