aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java27
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java23
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java8
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java4
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java4
5 files changed, 32 insertions, 34 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
index f9537f52..d6fa21b6 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
@@ -4,7 +4,7 @@
* ================================================================================
* Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved.
* Modifications Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019, 2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -168,6 +168,26 @@ public class BusTopicParams {
return additionalProps != null;
}
+ public void setEffectiveTopic(String effectiveTopic) {
+ this.effectiveTopic = topicToLowerCase(effectiveTopic);
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topicToLowerCase(topic);
+ }
+
+ public String getEffectiveTopic() {
+ return topicToLowerCase(effectiveTopic);
+ }
+
+ public String getTopic() {
+ return topicToLowerCase(topic);
+ }
+
+ private String topicToLowerCase(String topic) {
+ return (topic == null || topic.isEmpty()) ? topic : topic.toLowerCase();
+ }
+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public static class TopicParamsBuilder {
@@ -179,12 +199,12 @@ public class BusTopicParams {
}
public TopicParamsBuilder topic(String topic) {
- this.params.topic = topic;
+ this.params.setTopic(topic);
return this;
}
public TopicParamsBuilder effectiveTopic(String effectiveTopic) {
- this.params.effectiveTopic = effectiveTopic;
+ this.params.setEffectiveTopic(effectiveTopic);
return this;
}
@@ -306,7 +326,6 @@ public class BusTopicParams {
this.params.serializationProvider = serializationProvider;
return this;
}
-
}
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
index 27ed5e7a..02626d34 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
@@ -5,6 +5,7 @@
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
* Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -159,28 +160,6 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
}
@Override
- protected boolean anyNullOrEmpty(String... args) {
- for (String arg : args) {
- if (arg == null || arg.isEmpty()) {
- return true;
- }
- }
-
- return false;
- }
-
- @Override
- protected boolean allNullOrEmpty(String... args) {
- for (String arg : args) {
- if (!(arg == null || arg.isEmpty())) {
- return false;
- }
- }
-
- return true;
- }
-
- @Override
public String toString() {
return "InlineBusTopicSink [partitionId=" + partitionKey + ", alive=" + alive + ", publisher=" + publisher
+ "]";
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java
index 6574d408..f605de92 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation.
+ * Copyright (C) 2022-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,12 +33,12 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop
/**
* Logger.
*/
- private static Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class);
+ private static final Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class);
- protected Map<String, String> additionalProps = null;
+ protected Map<String, String> additionalProps;
/**
- * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains below mentioned
+ * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains the below
* attributes.
*
* <p>servers list of KAFKA servers available for publishing
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java
index 2a651ee7..713b4fd1 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation.
+ * Copyright (C) 2022-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,7 +24,7 @@ import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
/**
- * This topic source implementation specializes in reading messages over an Kafka Bus topic source and
+ * This topic source implementation specializes in reading messages over a Kafka Bus topic source and
* notifying its listeners.
*/
public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource implements KafkaTopicSource {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java
index 3372e0aa..c63fbcc2 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java
@@ -117,8 +117,8 @@ public abstract class TopicBase implements Topic {
}
this.servers = servers;
- this.topic = topic;
- this.effectiveTopic = effectiveTopicCopy;
+ this.topic = topic.toLowerCase();
+ this.effectiveTopic = effectiveTopicCopy.toLowerCase();
}
@Override