aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy
diff options
context:
space:
mode:
authoradheli.tavares <adheli.tavares@est.tech>2024-04-02 15:18:20 +0100
committeradheli.tavares <adheli.tavares@est.tech>2024-04-10 13:28:45 +0100
commit7e934a6e435c62b1188b44ba5bdc3985328a85fc (patch)
treefb3379a6710e46bc14423475fc0b3c9e4a47275f /policy-endpoints/src/main/java/org/onap/policy
parent90901c96e348e8ee65f218b90e46a25d4b85f96d (diff)
Dependency management update
- including dependencies to pom.xml files only where they are used, avoiding extra dependencies being added in all packages. - removal of unused UEB topic. Issue-ID: POLICY-4945 Change-Id: Ifc0212af2bc938e357e1addebcec591f9d6cfc14 Signed-off-by: adheli.tavares <adheli.tavares@est.tech>
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java4
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java40
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java68
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java202
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java216
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java41
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java28
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java92
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java29
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java104
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java97
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java89
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java87
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java74
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java4
15 files changed, 6 insertions, 1169 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java
index d09e7353..ce8e2387 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java
@@ -37,10 +37,6 @@ public interface Topic extends TopicRegisterable, Startable, Lockable {
*/
enum CommInfrastructure {
/**
- * UEB Communication Infrastructure.
- */
- UEB,
- /**
* KAFKA Communication Infrastructure.
*/
KAFKA,
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java
index 8cae5bd1..bf261def 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java
@@ -29,8 +29,6 @@ import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink;
import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource;
import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
import org.onap.policy.common.endpoints.parameters.TopicParameters;
@@ -129,18 +127,6 @@ public interface TopicEndpoint extends Startable, Lockable {
TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName);
/**
- * Get the UEB Topic Source for the given topic name.
- *
- * @param topicName the topic name
- *
- * @return the UEB Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for example multiple
- * TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- UebTopicSource getUebTopicSource(String topicName);
-
- /**
* Get the Noop Source for the given topic name.
*
* @param topicName the topic name.
@@ -198,18 +184,6 @@ public interface TopicEndpoint extends Startable, Lockable {
TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName);
/**
- * Get the UEB Topic Source for the given topic name.
- *
- * @param topicName the topic name
- *
- * @return the Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for example multiple
- * TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- UebTopicSink getUebTopicSink(String topicName);
-
- /**
* Get the no-op Topic Sink for the given topic name.
*
* @param topicName the topic name
@@ -234,13 +208,6 @@ public interface TopicEndpoint extends Startable, Lockable {
KafkaTopicSink getKafkaTopicSink(String topicName);
/**
- * Gets only the UEB Topic Sources.
- *
- * @return the UEB Topic Source List
- */
- List<UebTopicSource> getUebTopicSources();
-
- /**
* Gets only the KAFKA Topic Sources.
*
* @return the KAFKA Topic Source List
@@ -255,13 +222,6 @@ public interface TopicEndpoint extends Startable, Lockable {
List<NoopTopicSource> getNoopTopicSources();
/**
- * Gets only the UEB Topic Sinks.
- *
- * @return the UEB Topic Sink List
- */
- List<UebTopicSink> getUebTopicSinks();
-
- /**
* Gets only the KAFKA Topic Sinks.
*
* @return the KAFKA Topic Sinks List
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java
index 4ec8eb54..d9e55ddd 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java
@@ -34,9 +34,6 @@ import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicFactories;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource;
import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
import org.onap.policy.common.endpoints.parameters.TopicParameters;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
@@ -90,9 +87,6 @@ class TopicEndpointProxy implements TopicEndpoint {
for (TopicParameters param : paramList) {
switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) {
- case UEB:
- sources.add(UebTopicFactories.getSourceFactory().build(param));
- break;
case KAFKA:
sources.add(KafkaTopicFactories.getSourceFactory().build(param));
break;
@@ -114,13 +108,11 @@ class TopicEndpointProxy implements TopicEndpoint {
@Override
public List<TopicSource> addTopicSources(Properties properties) {
- // 1. Create UEB Sources
- // 2. Create KAFKA Sources
- // 3. Create NOOP Sources
+ // 1. Create KAFKA Sources
+ // 2. Create NOOP Sources
List<TopicSource> sources = new ArrayList<>();
- sources.addAll(UebTopicFactories.getSourceFactory().build(properties));
sources.addAll(KafkaTopicFactories.getSourceFactory().build(properties));
sources.addAll(NoopTopicFactories.getSourceFactory().build(properties));
@@ -141,9 +133,6 @@ class TopicEndpointProxy implements TopicEndpoint {
for (TopicParameters param : paramList) {
switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) {
- case UEB:
- sinks.add(UebTopicFactories.getSinkFactory().build(param));
- break;
case KAFKA:
sinks.add(KafkaTopicFactories.getSinkFactory().build(param));
break;
@@ -164,13 +153,11 @@ class TopicEndpointProxy implements TopicEndpoint {
@Override
public List<TopicSink> addTopicSinks(Properties properties) {
- // 1. Create UEB Sinks
- // 2. Create KAFKA Sinks
- // 3. Create NOOP Sinks
+ // 1. Create KAFKA Sinks
+ // 2. Create NOOP Sinks
final List<TopicSink> sinks = new ArrayList<>();
- sinks.addAll(UebTopicFactories.getSinkFactory().build(properties));
sinks.addAll(KafkaTopicFactories.getSinkFactory().build(properties));
sinks.addAll(NoopTopicFactories.getSinkFactory().build(properties));
@@ -190,7 +177,6 @@ class TopicEndpointProxy implements TopicEndpoint {
final List<TopicSource> sources = new ArrayList<>();
- sources.addAll(UebTopicFactories.getSourceFactory().inventory());
sources.addAll(KafkaTopicFactories.getSourceFactory().inventory());
sources.addAll(NoopTopicFactories.getSourceFactory().inventory());
@@ -208,12 +194,6 @@ class TopicEndpointProxy implements TopicEndpoint {
topicNames.forEach(topic -> {
try {
- sources.add(Objects.requireNonNull(this.getUebTopicSource(topic)));
- } catch (final Exception e) {
- logger.debug("No UEB source for topic: {}", topic, e);
- }
-
- try {
sources.add(Objects.requireNonNull(this.getKafkaTopicSource(topic)));
} catch (final Exception e) {
logger.debug("No KAFKA source for topic: {}", topic, e);
@@ -234,7 +214,6 @@ class TopicEndpointProxy implements TopicEndpoint {
final List<TopicSink> sinks = new ArrayList<>();
- sinks.addAll(UebTopicFactories.getSinkFactory().inventory());
sinks.addAll(KafkaTopicFactories.getSinkFactory().inventory());
sinks.addAll(NoopTopicFactories.getSinkFactory().inventory());
@@ -251,12 +230,6 @@ class TopicEndpointProxy implements TopicEndpoint {
final List<TopicSink> sinks = new ArrayList<>();
for (final String topic : topicNames) {
try {
- sinks.add(Objects.requireNonNull(this.getUebTopicSink(topic)));
- } catch (final Exception e) {
- logger.debug("No UEB sink for topic: {}", topic, e);
- }
-
- try {
sinks.add(Objects.requireNonNull(this.getKafkaTopicSink(topic)));
} catch (final Exception e) {
logger.debug("No KAFKA sink for topic: {}", topic, e);
@@ -280,12 +253,6 @@ class TopicEndpointProxy implements TopicEndpoint {
final List<TopicSink> sinks = new ArrayList<>();
try {
- sinks.add(this.getUebTopicSink(topicName));
- } catch (final Exception e) {
- logNoSink(topicName, e);
- }
-
- try {
sinks.add(this.getKafkaTopicSink(topicName));
} catch (final Exception e) {
logNoSink(topicName, e);
@@ -302,12 +269,6 @@ class TopicEndpointProxy implements TopicEndpoint {
@GsonJsonIgnore
@Override
- public List<UebTopicSource> getUebTopicSources() {
- return UebTopicFactories.getSourceFactory().inventory();
- }
-
- @GsonJsonIgnore
- @Override
public List<KafkaTopicSource> getKafkaTopicSources() {
return KafkaTopicFactories.getSourceFactory().inventory();
}
@@ -318,12 +279,6 @@ class TopicEndpointProxy implements TopicEndpoint {
return NoopTopicFactories.getSourceFactory().inventory();
}
- @GsonJsonIgnore
- @Override
- public List<UebTopicSink> getUebTopicSinks() {
- return UebTopicFactories.getSinkFactory().inventory();
- }
-
@Override
@GsonJsonIgnore
public List<KafkaTopicSink> getKafkaTopicSinks() {
@@ -411,9 +366,6 @@ class TopicEndpointProxy implements TopicEndpoint {
public void shutdown() {
this.stop();
- UebTopicFactories.getSourceFactory().destroy();
- UebTopicFactories.getSinkFactory().destroy();
-
KafkaTopicFactories.getSourceFactory().destroy();
KafkaTopicFactories.getSinkFactory().destroy();
@@ -478,7 +430,6 @@ class TopicEndpointProxy implements TopicEndpoint {
}
return switch (commType) {
- case UEB -> this.getUebTopicSource(topicName);
case KAFKA -> this.getKafkaTopicSource(topicName);
case NOOP -> this.getNoopTopicSource(topicName);
default -> throw new UnsupportedOperationException("Unsupported " + commType.name());
@@ -496,7 +447,6 @@ class TopicEndpointProxy implements TopicEndpoint {
}
return switch (commType) {
- case UEB -> this.getUebTopicSink(topicName);
case KAFKA -> this.getKafkaTopicSink(topicName);
case NOOP -> this.getNoopTopicSink(topicName);
default -> throw new UnsupportedOperationException("Unsupported " + commType.name());
@@ -504,16 +454,6 @@ class TopicEndpointProxy implements TopicEndpoint {
}
@Override
- public UebTopicSource getUebTopicSource(String topicName) {
- return UebTopicFactories.getSourceFactory().get(topicName);
- }
-
- @Override
- public UebTopicSink getUebTopicSink(String topicName) {
- return UebTopicFactories.getSinkFactory().get(topicName);
- }
-
- @Override
public KafkaTopicSource getKafkaTopicSource(String topicName) {
return KafkaTopicFactories.getSourceFactory().get(topicName);
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java
deleted file mode 100644
index b04fc078..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import com.google.re2j.Pattern;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
-import org.apache.commons.lang3.StringUtils;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-import org.onap.policy.common.endpoints.utils.PropertyUtils;
-import org.onap.policy.common.endpoints.utils.UebPropertyUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Factory of UEB Reader Topics indexed by topic name.
- */
-class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
- private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
- private static final String MISSING_TOPIC = "A topic must be provided";
-
- /**
- * Logger.
- */
- private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
-
- /**
- * UEB Topic Name Index.
- */
- protected HashMap<String, UebTopicSink> uebTopicSinks = new HashMap<>();
-
- @Override
- public UebTopicSink build(BusTopicParams busTopicParams) {
-
- if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
- throw new IllegalArgumentException("UEB Server(s) must be provided");
- }
-
- if (StringUtils.isBlank(busTopicParams.getTopic())) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized (this) {
- if (uebTopicSinks.containsKey(busTopicParams.getTopic())) {
- return uebTopicSinks.get(busTopicParams.getTopic());
- }
-
- UebTopicSink uebTopicWriter = makeSink(busTopicParams);
-
- if (busTopicParams.isManaged()) {
- uebTopicSinks.put(busTopicParams.getTopic(), uebTopicWriter);
- }
-
- return uebTopicWriter;
- }
- }
-
-
- @Override
- public UebTopicSink build(List<String> servers, String topic) {
- return this.build(BusTopicParams.builder()
- .servers(servers)
- .topic(topic)
- .managed(true)
- .useHttps(false)
- .allowSelfSignedCerts(false)
- .build());
- }
-
-
- @Override
- public List<UebTopicSink> build(Properties properties) {
-
- String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS);
- if (StringUtils.isBlank(writeTopics)) {
- logger.info("{}: no topic for UEB Sink", this);
- return new ArrayList<>();
- }
-
- List<UebTopicSink> newUebTopicSinks = new ArrayList<>();
- synchronized (this) {
- for (String topic : COMMA_SPACE_PAT.split(writeTopics)) {
- addTopic(newUebTopicSinks, topic, properties);
- }
- return newUebTopicSinks;
- }
- }
-
- private void addTopic(List<UebTopicSink> newUebTopicSinks, String topic, Properties properties) {
- if (this.uebTopicSinks.containsKey(topic)) {
- newUebTopicSinks.add(this.uebTopicSinks.get(topic));
- return;
- }
-
- String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic;
-
- var props = new PropertyUtils(properties, topicPrefix,
- (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
-
- String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
- if (StringUtils.isBlank(servers)) {
- logger.error("{}: no UEB servers configured for sink {}", this, topic);
- return;
- }
-
- UebTopicSink uebTopicWriter = this.build(UebPropertyUtils.makeBuilder(props, topic, servers)
- .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null))
- .build());
- newUebTopicSinks.add(uebTopicWriter);
- }
-
- @Override
- public void destroy(String topic) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- UebTopicSink uebTopicWriter;
- synchronized (this) {
- if (!uebTopicSinks.containsKey(topic)) {
- return;
- }
-
- uebTopicWriter = uebTopicSinks.remove(topic);
- }
-
- uebTopicWriter.shutdown();
- }
-
- @Override
- public void destroy() {
- List<UebTopicSink> writers = this.inventory();
- for (UebTopicSink writer : writers) {
- writer.shutdown();
- }
-
- synchronized (this) {
- this.uebTopicSinks.clear();
- }
- }
-
- @Override
- public UebTopicSink get(String topic) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized (this) {
- if (uebTopicSinks.containsKey(topic)) {
- return uebTopicSinks.get(topic);
- } else {
- throw new IllegalStateException("UebTopicSink for " + topic + " not found");
- }
- }
- }
-
- @Override
- public synchronized List<UebTopicSink> inventory() {
- return new ArrayList<>(this.uebTopicSinks.values());
- }
-
- /**
- * Makes a new sink.
- *
- * @param busTopicParams parameters to use to configure the sink
- * @return a new sink
- */
- protected UebTopicSink makeSink(BusTopicParams busTopicParams) {
- return new InlineUebTopicSink(busTopicParams);
- }
-
-
- @Override
- public String toString() {
- return "IndexedUebTopicSinkFactory " + uebTopicSinks.keySet();
- }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java
deleted file mode 100644
index 09500978..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import com.google.re2j.Pattern;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
-import org.apache.commons.lang3.StringUtils;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-import org.onap.policy.common.endpoints.utils.PropertyUtils;
-import org.onap.policy.common.endpoints.utils.UebPropertyUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Factory of UEB Source Topics indexed by topic name.
- */
-class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
- private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
- private static final String MISSING_TOPIC = "A topic must be provided";
-
- /**
- * Logger.
- */
- private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class);
-
- /**
- * UEB Topic Name Index.
- */
- protected HashMap<String, UebTopicSource> uebTopicSources = new HashMap<>();
-
- @Override
- public UebTopicSource build(BusTopicParams busTopicParams) {
- if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
- throw new IllegalArgumentException("UEB Server(s) must be provided");
- }
-
- if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized (this) {
- if (uebTopicSources.containsKey(busTopicParams.getTopic())) {
- return uebTopicSources.get(busTopicParams.getTopic());
- }
-
- var uebTopicSource = makeSource(busTopicParams);
-
- if (busTopicParams.isManaged()) {
- uebTopicSources.put(busTopicParams.getTopic(), uebTopicSource);
- }
-
- return uebTopicSource;
- }
- }
-
- @Override
- public List<UebTopicSource> build(Properties properties) {
-
- String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS);
- if (StringUtils.isBlank(readTopics)) {
- logger.info("{}: no topic for UEB Source", this);
- return new ArrayList<>();
- }
-
- List<UebTopicSource> newUebTopicSources = new ArrayList<>();
- synchronized (this) {
- for (String topic : COMMA_SPACE_PAT.split(readTopics)) {
- addTopic(newUebTopicSources, topic, properties);
- }
- }
- return newUebTopicSources;
- }
-
- @Override
- public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
-
- return this.build(BusTopicParams.builder()
- .servers(servers)
- .topic(topic)
- .apiKey(apiKey)
- .apiSecret(apiSecret)
- .fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH)
- .fetchLimit(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH)
- .managed(true)
- .useHttps(false)
- .allowSelfSignedCerts(true).build());
- }
-
- @Override
- public UebTopicSource build(List<String> servers, String topic) {
- return this.build(servers, topic, null, null);
- }
-
- private void addTopic(List<UebTopicSource> newUebTopicSources, String topic, Properties properties) {
- if (this.uebTopicSources.containsKey(topic)) {
- newUebTopicSources.add(this.uebTopicSources.get(topic));
- return;
- }
-
- String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic;
-
- var props = new PropertyUtils(properties, topicPrefix,
- (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
-
- String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
- if (StringUtils.isBlank(servers)) {
- logger.error("{}: no UEB servers configured for sink {}", this, topic);
- return;
- }
-
- var uebTopicSource = this.build(UebPropertyUtils.makeBuilder(props, topic, servers)
- .consumerGroup(props.getString(
- PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null))
- .consumerInstance(props.getString(
- PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null))
- .fetchTimeout(props.getInteger(
- PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX,
- PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH))
- .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX,
- PolicyEndPointProperties.DEFAULT_LIMIT_FETCH))
- .build());
-
- newUebTopicSources.add(uebTopicSource);
- }
-
- /**
- * Makes a new source.
- *
- * @param busTopicParams parameters to use to configure the source
- * @return a new source
- */
- protected UebTopicSource makeSource(BusTopicParams busTopicParams) {
- return new SingleThreadedUebTopicSource(busTopicParams);
- }
-
- @Override
- public void destroy(String topic) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- UebTopicSource uebTopicSource;
-
- synchronized (this) {
- if (!uebTopicSources.containsKey(topic)) {
- return;
- }
-
- uebTopicSource = uebTopicSources.remove(topic);
- }
-
- uebTopicSource.shutdown();
- }
-
- @Override
- public void destroy() {
- List<UebTopicSource> readers = this.inventory();
- for (UebTopicSource reader : readers) {
- reader.shutdown();
- }
-
- synchronized (this) {
- this.uebTopicSources.clear();
- }
- }
-
- @Override
- public UebTopicSource get(String topic) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized (this) {
- if (uebTopicSources.containsKey(topic)) {
- return uebTopicSources.get(topic);
- } else {
- throw new IllegalStateException("UebTopiceSource for " + topic + " not found");
- }
- }
- }
-
- @Override
- public synchronized List<UebTopicSource> inventory() {
- return new ArrayList<>(this.uebTopicSources.values());
- }
-
- @Override
- public String toString() {
- return "IndexedUebTopicSourceFactory " + uebTopicSources.keySet();
- }
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java
deleted file mode 100644
index 721f2135..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import lombok.AccessLevel;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public class UebTopicFactories {
-
- /**
- * Factory for instantiation and management of sinks.
- */
- @Getter
- private static final UebTopicSinkFactory sinkFactory = new IndexedUebTopicSinkFactory();
-
- /**
- * Factory for instantiation and management of sources.
- */
- @Getter
- private static final UebTopicSourceFactory sourceFactory = new IndexedUebTopicSourceFactory();
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java
deleted file mode 100644
index acfef6da..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-/**
- * Topic Writer over UEB Infrastructure.
- */
-public interface UebTopicSink extends BusTopicSink {
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java
deleted file mode 100644
index 0cf095fd..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
- * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import java.util.List;
-import java.util.Properties;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-
-/**
- * UEB Topic Sink Factory.
- */
-public interface UebTopicSinkFactory {
-
- /**
- * Instantiates a new UEB Topic Writer.
- *
- * @param busTopicParams parameters object
- * @return an UEB Topic Sink
- */
- UebTopicSink build(BusTopicParams busTopicParams);
-
- /**
- * Creates an UEB Topic Writer based on properties files.
- *
- * @param properties Properties containing initialization values
- *
- * @return an UEB Topic Writer
- * @throws IllegalArgumentException if invalid parameters are present
- */
- List<UebTopicSink> build(Properties properties);
-
- /**
- * Instantiates a new UEB Topic Writer.
- *
- * @param servers list of servers
- * @param topic topic name
- *
- * @return an UEB Topic Writer
- * @throws IllegalArgumentException if invalid parameters are present
- */
- UebTopicSink build(List<String> servers, String topic);
-
- /**
- * Destroys an UEB Topic Writer based on a topic.
- *
- * @param topic topic name
- * @throws IllegalArgumentException if invalid parameters are present
- */
- void destroy(String topic);
-
- /**
- * Destroys all UEB Topic Writers.
- */
- void destroy();
-
- /**
- * gets an UEB Topic Writer based on topic name.
- *
- * @param topic the topic name
- *
- * @return an UEB Topic Writer with topic name
- * @throws IllegalArgumentException if an invalid topic is provided
- * @throws IllegalStateException if the UEB Topic Reader is an incorrect state
- */
- UebTopicSink get(String topic);
-
- /**
- * Provides a snapshot of the UEB Topic Writers.
- *
- * @return a list of the UEB Topic Writers
- */
- List<UebTopicSink> inventory();
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java
deleted file mode 100644
index 56534309..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-/**
- * Topic Source for UEB Communication Infrastructure.
- *
- */
-public interface UebTopicSource extends BusTopicSource {
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java
deleted file mode 100644
index beacee3b..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
- * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import java.util.List;
-import java.util.Properties;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-
-/**
- * UEB Topic Source Factory.
- */
-public interface UebTopicSourceFactory {
-
- /**
- * Creates an UEB Topic Source based on properties files.
- *
- * @param properties Properties containing initialization values
- *
- * @return an UEB Topic Source
- * @throws IllegalArgumentException if invalid parameters are present
- */
- List<UebTopicSource> build(Properties properties);
-
- /**
- * Instantiates a new UEB Topic Source.
- *
- * @param busTopicParams parameters object
- * @return an UEB Topic Source
- */
- UebTopicSource build(BusTopicParams busTopicParams);
-
- /**
- * Instantiates a new UEB Topic Source.
- *
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- *
- * @return an UEB Topic Source
- * @throws IllegalArgumentException if invalid parameters are present
- */
- UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
-
- /**
- * Instantiates a new UEB Topic Source.
- *
- * @param servers list of servers
- * @param topic topic name
- *
- * @return an UEB Topic Source
- * @throws IllegalArgumentException if invalid parameters are present
- */
- UebTopicSource build(List<String> servers, String topic);
-
- /**
- * Destroys an UEB Topic Source based on a topic.
- *
- * @param topic topic name
- * @throws IllegalArgumentException if invalid parameters are present
- */
- void destroy(String topic);
-
- /**
- * Destroys all UEB Topic Sources.
- */
- void destroy();
-
- /**
- * Gets an UEB Topic Source based on topic name.
- *
- * @param topic the topic name
- * @return an UEB Topic Source with topic name
- * @throws IllegalArgumentException if an invalid topic is provided
- * @throws IllegalStateException if the UEB Topic Source is an incorrect state
- */
- UebTopicSource get(String topic);
-
- /**
- * Provides a snapshot of the UEB Topic Sources.
- *
- * @return a list of the UEB Topic Sources
- */
- List<UebTopicSource> inventory();
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
index 3c57d1ba..b46c2715 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
@@ -23,9 +23,6 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-import com.att.nsa.cambria.client.CambriaConsumer;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.TraceFlags;
@@ -33,9 +30,7 @@ import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor;
import java.io.IOException;
-import java.net.MalformedURLException;
import java.nio.charset.StandardCharsets;
-import java.security.GeneralSecurityException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
@@ -138,98 +133,6 @@ public interface BusConsumer {
}
/**
- * Cambria based consumer.
- */
- public static class CambriaConsumerWrapper extends FetchingBusConsumer {
-
- /**
- * logger.
- */
- private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
-
- /**
- * Used to build the consumer.
- */
- private final ConsumerBuilder builder;
-
- /**
- * Cambria client.
- */
- private final CambriaConsumer consumer;
-
- /**
- * Cambria Consumer Wrapper.
- * BusTopicParam object contains the following parameters
- * servers - messaging bus hosts.
- * topic - topic for messages
- * apiKey - API Key
- * apiSecret - API Secret
- * consumerGroup - Consumer Group
- * consumerInstance - Consumer Instance
- * fetchTimeout - Fetch Timeout
- * fetchLimit - Fetch Limit
- *
- * @param busTopicParams - The parameters for the bus topic
- */
- public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
- super(busTopicParams);
-
- this.builder = new CambriaClientBuilders.ConsumerBuilder();
-
- builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
- .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
- .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
-
- // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
- builder.withSocketTimeout(fetchTimeout + 30000);
-
- if (busTopicParams.isUseHttps()) {
- builder.usingHttps();
-
- if (busTopicParams.isAllowSelfSignedCerts()) {
- builder.allowSelfSignedCertificates();
- }
- }
-
- if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
- builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
- }
-
- if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
- builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
- }
-
- try {
- this.consumer = builder.build();
- } catch (MalformedURLException | GeneralSecurityException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- @Override
- public Iterable<String> fetch() throws IOException {
- try {
- return this.consumer.fetch();
- } catch (final IOException e) { //NOSONAR
- logger.error("{}: cannot fetch because of {}", this, e.getMessage());
- sleepAfterFetchFailure();
- throw e;
- }
- }
-
- @Override
- public void close() {
- super.close();
- this.consumer.close();
- }
-
- @Override
- public String toString() {
- return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
- }
- }
-
- /**
* Kafka based consumer.
*/
class KafkaConsumerWrapper extends FetchingBusConsumer {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
index e2adde0d..1b57e48e 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
@@ -23,19 +23,13 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
-import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,89 +55,6 @@ public interface BusPublisher {
void close();
/**
- * Cambria based library publisher.
- */
- class CambriaPublisherWrapper implements BusPublisher {
-
- private static final Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
-
- /**
- * The actual Cambria publisher.
- */
- @GsonJsonIgnore
- protected CambriaBatchingPublisher publisher;
-
- /**
- * Constructor.
- *
- * @param busTopicParams topic parameters
- */
- public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
-
- var builder = new CambriaClientBuilders.PublisherBuilder();
-
- builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
-
- // Set read timeout to 30 seconds (TBD: this should be configurable)
- builder.withSocketTimeout(30000);
-
- if (busTopicParams.isUseHttps()) {
- if (busTopicParams.isAllowSelfSignedCerts()) {
- builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
- } else {
- builder.withConnectionType(ConnectionType.HTTPS);
- }
- }
-
- if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
- builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
- }
-
- if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
- builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
- }
-
- try {
- this.publisher = builder.build();
- } catch (MalformedURLException | GeneralSecurityException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- @Override
- public boolean send(String partitionId, String message) {
- if (message == null) {
- throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
- }
-
- try {
- this.publisher.send(partitionId, message);
- } catch (Exception e) {
- logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
- return false;
- }
- return true;
- }
-
- @Override
- public void close() {
- logger.info(LOG_CLOSE, this);
-
- try {
- this.publisher.close();
- } catch (Exception e) {
- logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
- }
- }
-
- @Override
- public String toString() {
- return "CambriaPublisherWrapper []";
- }
-
- }
-
- /**
* Kafka based library publisher.
*/
class KafkaPublisherWrapper implements BusPublisher {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java
deleted file mode 100644
index 896cb3bb..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved.
- * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal;
-
-import org.onap.policy.common.endpoints.event.comm.Topic;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This implementation publishes events for the associated UEB topic, inline with the calling
- * thread.
- */
-public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSink {
-
- /**
- * Logger.
- */
- private static Logger logger = LoggerFactory.getLogger(InlineUebTopicSink.class);
-
- /**
- * Argument-based UEB Topic Writer instantiation. BusTopicParams contains below mentioned
- * attributes.
- *
- * <p>servers list of UEB servers available for publishing
- * topic the topic to publish to
- * apiKey the api key (optional)
- * apiSecret the api secret (optional)
- * partitionId the partition key (optional, autogenerated if not provided)
- * useHttps does connection use HTTPS?
- * allowTracing is tracing allowed?
- * allowSelfSignedCerts are self-signed certificates allow
- * @param busTopicParams contains attributes needed
- * @throws IllegalArgumentException if invalid arguments are detected
- */
- public InlineUebTopicSink(BusTopicParams busTopicParams) {
- super(busTopicParams);
- }
-
- /**
- * Instantiation of internal resources.
- */
- @Override
- public void init() {
-
- this.publisher = new BusPublisher.CambriaPublisherWrapper(BusTopicParams.builder()
- .servers(this.servers)
- .topic(this.effectiveTopic)
- .apiKey(this.apiKey)
- .apiSecret(this.apiSecret)
- .useHttps(this.useHttps)
- .allowTracing(this.allowTracing)
- .allowSelfSignedCerts(this.allowSelfSignedCerts)
- .build());
- logger.info("{}: UEB SINK created", this);
- }
-
- @Override
- public String toString() {
- return "InlineUebTopicSink [getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()="
- + super.toString() + "]";
- }
-
- @Override
- public CommInfrastructure getTopicCommInfrastructure() {
- return Topic.CommInfrastructure.UEB;
- }
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java
deleted file mode 100644
index ead04594..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal;
-
-import org.onap.policy.common.endpoints.event.comm.Topic;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource;
-
-/**
- * This topic source implementation specializes in reading messages over an UEB Bus topic source and
- * notifying its listeners.
- */
-public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource implements UebTopicSource {
-
- /**
- * Constructor.
- *
- * @param busTopicParams Parameters object containing all the required inputs
- * @throws IllegalArgumentException An invalid parameter passed in
- */
- public SingleThreadedUebTopicSource(BusTopicParams busTopicParams) {
- super(busTopicParams);
- this.init();
- }
-
- /**
- * Initialize the Cambria client.
- */
- @Override
- public void init() {
- this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder()
- .servers(this.servers)
- .topic(this.effectiveTopic)
- .apiKey(this.apiKey)
- .apiSecret(this.apiSecret)
- .consumerGroup(this.consumerGroup)
- .consumerInstance(this.consumerInstance)
- .fetchTimeout(this.fetchTimeout)
- .fetchLimit(this.fetchLimit)
- .useHttps(this.useHttps)
- .allowTracing(this.allowTracing)
- .allowSelfSignedCerts(this.allowSelfSignedCerts).build());
- }
-
- @Override
- public CommInfrastructure getTopicCommInfrastructure() {
- return Topic.CommInfrastructure.UEB;
- }
-
- @Override
- public String toString() {
- return "SingleThreadedUebTopicSource [getTopicCommInfrastructure()=" + getTopicCommInfrastructure()
- + ", toString()=" + super.toString() + "]";
- }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java
index 5f49ea34..0ccc8a75 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java
@@ -3,7 +3,7 @@
* ONAP PAP
* ================================================================================
* Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019, 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
/**
* Client for sending messages to a Topic using TopicSink.
*/
+@Getter
public class TopicSinkClient {
private static final Logger logger = LoggerFactory.getLogger(TopicSinkClient.class);
@@ -46,7 +47,6 @@ public class TopicSinkClient {
/**
* Where messages are published.
*/
- @Getter
private final TopicSink sink;
/**