aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java5
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java9
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java9
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java16
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java27
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java23
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java8
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java4
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java4
9 files changed, 53 insertions, 52 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java
index e77beea1..5ca87732 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java
@@ -3,6 +3,7 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -32,12 +33,12 @@ public interface BusTopicSink extends ApiKeyEnabled, TopicSink {
*
* @param partitionKey the partition key
*/
- public void setPartitionKey(String partitionKey);
+ void setPartitionKey(String partitionKey);
/**
* Return the partition key in used by the system to publish messages.
*
* @return the partition key
*/
- public String getPartitionKey();
+ String getPartitionKey();
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java
index 23aaabd4..f913926e 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation.
+ * Copyright (C) 2022-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -42,7 +42,7 @@ class IndexedKafkaTopicSinkFactory implements KafkaTopicSinkFactory {
/**
* Logger.
*/
- private static Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSinkFactory.class);
+ private static final Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSinkFactory.class);
/**
* KAFKA Topic Name Index.
@@ -98,7 +98,7 @@ class IndexedKafkaTopicSinkFactory implements KafkaTopicSinkFactory {
List<KafkaTopicSink> newKafkaTopicSinks = new ArrayList<>();
synchronized (this) {
for (String topic : COMMA_SPACE_PAT.split(writeTopics)) {
- addTopic(newKafkaTopicSinks, topic, properties);
+ addTopic(newKafkaTopicSinks, topic.toLowerCase(), properties);
}
return newKafkaTopicSinks;
}
@@ -113,7 +113,8 @@ class IndexedKafkaTopicSinkFactory implements KafkaTopicSinkFactory {
String topicPrefix = PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic;
var props = new PropertyUtils(properties, topicPrefix,
- (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
+ (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic sink {} ",
+ this, name, value, topic));
String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
if (StringUtils.isBlank(servers)) {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java
index 1d586e43..151d8f69 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java
@@ -42,7 +42,7 @@ class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory {
/**
* Logger.
*/
- private static Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSourceFactory.class);
+ private static final Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSourceFactory.class);
/**
* KAFKA Topic Name Index.
@@ -84,7 +84,7 @@ class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory {
List<KafkaTopicSource> newKafkaTopicSources = new ArrayList<>();
synchronized (this) {
for (String topic : COMMA_SPACE_PAT.split(readTopics)) {
- addTopic(newKafkaTopicSources, topic, properties);
+ addTopic(newKafkaTopicSources, topic.toLowerCase(), properties);
}
}
return newKafkaTopicSources;
@@ -110,11 +110,12 @@ class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory {
String topicPrefix = PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic;
var props = new PropertyUtils(properties, topicPrefix,
- (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
+ (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic source {} ",
+ this, name, value, topic));
String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
if (StringUtils.isBlank(servers)) {
- logger.error("{}: no KAFKA servers configured for sink {}", this, topic);
+ logger.error("{}: no KAFKA servers configured for source {}", this, topic);
return;
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java
index 8cb51df8..e5642daa 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation.
+ * Copyright (C) 2022-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,11 +28,11 @@ import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
public interface KafkaTopicSourceFactory {
/**
- * Creates an Kafka Topic Source based on properties files.
+ * Creates a Kafka Topic Source based on properties files.
*
* @param properties Properties containing initialization values
*
- * @return an Kafka Topic Source
+ * @return a Kafka Topic Source
* @throws IllegalArgumentException if invalid parameters are present
*/
List<KafkaTopicSource> build(Properties properties);
@@ -41,7 +41,7 @@ public interface KafkaTopicSourceFactory {
* Instantiates a new Kafka Topic Source.
*
* @param busTopicParams parameters object
- * @return an Kafka Topic Source
+ * @return a Kafka Topic Source
*/
KafkaTopicSource build(BusTopicParams busTopicParams);
@@ -51,13 +51,13 @@ public interface KafkaTopicSourceFactory {
* @param servers list of servers
* @param topic topic name
*
- * @return an Kafka Topic Source
+ * @return a Kafka Topic Source
* @throws IllegalArgumentException if invalid parameters are present
*/
KafkaTopicSource build(List<String> servers, String topic);
/**
- * Destroys an Kafka Topic Source based on a topic.
+ * Destroys a Kafka Topic Source based on a topic.
*
* @param topic topic name
* @throws IllegalArgumentException if invalid parameters are present
@@ -70,10 +70,10 @@ public interface KafkaTopicSourceFactory {
void destroy();
/**
- * Gets an Kafka Topic Source based on topic name.
+ * Gets a Kafka Topic Source based on topic name.
*
* @param topic the topic name
- * @return an Kafka Topic Source with topic name
+ * @return a Kafka Topic Source with topic name
* @throws IllegalArgumentException if an invalid topic is provided
* @throws IllegalStateException if the Kafka Topic Source is an incorrect state
*/
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
index f9537f52..d6fa21b6 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
@@ -4,7 +4,7 @@
* ================================================================================
* Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved.
* Modifications Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019, 2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -168,6 +168,26 @@ public class BusTopicParams {
return additionalProps != null;
}
+ public void setEffectiveTopic(String effectiveTopic) {
+ this.effectiveTopic = topicToLowerCase(effectiveTopic);
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topicToLowerCase(topic);
+ }
+
+ public String getEffectiveTopic() {
+ return topicToLowerCase(effectiveTopic);
+ }
+
+ public String getTopic() {
+ return topicToLowerCase(topic);
+ }
+
+ private String topicToLowerCase(String topic) {
+ return (topic == null || topic.isEmpty()) ? topic : topic.toLowerCase();
+ }
+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public static class TopicParamsBuilder {
@@ -179,12 +199,12 @@ public class BusTopicParams {
}
public TopicParamsBuilder topic(String topic) {
- this.params.topic = topic;
+ this.params.setTopic(topic);
return this;
}
public TopicParamsBuilder effectiveTopic(String effectiveTopic) {
- this.params.effectiveTopic = effectiveTopic;
+ this.params.setEffectiveTopic(effectiveTopic);
return this;
}
@@ -306,7 +326,6 @@ public class BusTopicParams {
this.params.serializationProvider = serializationProvider;
return this;
}
-
}
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
index 27ed5e7a..02626d34 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
@@ -5,6 +5,7 @@
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
* Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -159,28 +160,6 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
}
@Override
- protected boolean anyNullOrEmpty(String... args) {
- for (String arg : args) {
- if (arg == null || arg.isEmpty()) {
- return true;
- }
- }
-
- return false;
- }
-
- @Override
- protected boolean allNullOrEmpty(String... args) {
- for (String arg : args) {
- if (!(arg == null || arg.isEmpty())) {
- return false;
- }
- }
-
- return true;
- }
-
- @Override
public String toString() {
return "InlineBusTopicSink [partitionId=" + partitionKey + ", alive=" + alive + ", publisher=" + publisher
+ "]";
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java
index 6574d408..f605de92 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation.
+ * Copyright (C) 2022-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,12 +33,12 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop
/**
* Logger.
*/
- private static Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class);
+ private static final Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class);
- protected Map<String, String> additionalProps = null;
+ protected Map<String, String> additionalProps;
/**
- * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains below mentioned
+ * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains the below
* attributes.
*
* <p>servers list of KAFKA servers available for publishing
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java
index 2a651ee7..713b4fd1 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation.
+ * Copyright (C) 2022-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,7 +24,7 @@ import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
/**
- * This topic source implementation specializes in reading messages over an Kafka Bus topic source and
+ * This topic source implementation specializes in reading messages over a Kafka Bus topic source and
* notifying its listeners.
*/
public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource implements KafkaTopicSource {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java
index 3372e0aa..c63fbcc2 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java
@@ -117,8 +117,8 @@ public abstract class TopicBase implements Topic {
}
this.servers = servers;
- this.topic = topic;
- this.effectiveTopic = effectiveTopicCopy;
+ this.topic = topic.toLowerCase();
+ this.effectiveTopic = effectiveTopicCopy.toLowerCase();
}
@Override