From 7e934a6e435c62b1188b44ba5bdc3985328a85fc Mon Sep 17 00:00:00 2001 From: "adheli.tavares" Date: Tue, 2 Apr 2024 15:18:20 +0100 Subject: Dependency management update - including dependencies to pom.xml files only where they are used, avoiding extra dependencies being added in all packages. - removal of unused UEB topic. Issue-ID: POLICY-4945 Change-Id: Ifc0212af2bc938e357e1addebcec591f9d6cfc14 Signed-off-by: adheli.tavares --- .../policy/common/endpoints/event/comm/Topic.java | 4 - .../common/endpoints/event/comm/TopicEndpoint.java | 40 ---- .../endpoints/event/comm/TopicEndpointProxy.java | 68 +------ .../event/comm/bus/IndexedUebTopicSinkFactory.java | 202 ------------------- .../comm/bus/IndexedUebTopicSourceFactory.java | 216 --------------------- .../event/comm/bus/UebTopicFactories.java | 41 ---- .../endpoints/event/comm/bus/UebTopicSink.java | 28 --- .../event/comm/bus/UebTopicSinkFactory.java | 92 --------- .../endpoints/event/comm/bus/UebTopicSource.java | 29 --- .../event/comm/bus/UebTopicSourceFactory.java | 104 ---------- .../event/comm/bus/internal/BusConsumer.java | 97 --------- .../event/comm/bus/internal/BusPublisher.java | 89 --------- .../comm/bus/internal/InlineUebTopicSink.java | 87 --------- .../bus/internal/SingleThreadedUebTopicSource.java | 74 ------- .../event/comm/client/TopicSinkClient.java | 4 +- 15 files changed, 6 insertions(+), 1169 deletions(-) delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java (limited to 'policy-endpoints/src/main/java/org/onap/policy/common') diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java index d09e7353..ce8e2387 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java @@ -36,10 +36,6 @@ public interface Topic extends TopicRegisterable, Startable, Lockable { * Underlying Communication infrastructure Types. */ enum CommInfrastructure { - /** - * UEB Communication Infrastructure. - */ - UEB, /** * KAFKA Communication Infrastructure. */ diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java index 8cae5bd1..bf261def 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java @@ -29,8 +29,6 @@ import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; import org.onap.policy.common.endpoints.parameters.TopicParameters; @@ -128,18 +126,6 @@ public interface TopicEndpoint extends Startable, Lockable { */ TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName); - /** - * Get the UEB Topic Source for the given topic name. - * - * @param topicName the topic name - * - * @return the UEB Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - UebTopicSource getUebTopicSource(String topicName); - /** * Get the Noop Source for the given topic name. * @@ -197,18 +183,6 @@ public interface TopicEndpoint extends Startable, Lockable { */ TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName); - /** - * Get the UEB Topic Source for the given topic name. - * - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - UebTopicSink getUebTopicSink(String topicName); - /** * Get the no-op Topic Sink for the given topic name. * @@ -233,13 +207,6 @@ public interface TopicEndpoint extends Startable, Lockable { */ KafkaTopicSink getKafkaTopicSink(String topicName); - /** - * Gets only the UEB Topic Sources. - * - * @return the UEB Topic Source List - */ - List getUebTopicSources(); - /** * Gets only the KAFKA Topic Sources. * @@ -254,13 +221,6 @@ public interface TopicEndpoint extends Startable, Lockable { */ List getNoopTopicSources(); - /** - * Gets only the UEB Topic Sinks. - * - * @return the UEB Topic Sink List - */ - List getUebTopicSinks(); - /** * Gets only the KAFKA Topic Sinks. * diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java index 4ec8eb54..d9e55ddd 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java @@ -34,9 +34,6 @@ import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicFactories; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; import org.onap.policy.common.endpoints.parameters.TopicParameters; import org.onap.policy.common.gson.annotation.GsonJsonIgnore; @@ -90,9 +87,6 @@ class TopicEndpointProxy implements TopicEndpoint { for (TopicParameters param : paramList) { switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) { - case UEB: - sources.add(UebTopicFactories.getSourceFactory().build(param)); - break; case KAFKA: sources.add(KafkaTopicFactories.getSourceFactory().build(param)); break; @@ -114,13 +108,11 @@ class TopicEndpointProxy implements TopicEndpoint { @Override public List addTopicSources(Properties properties) { - // 1. Create UEB Sources - // 2. Create KAFKA Sources - // 3. Create NOOP Sources + // 1. Create KAFKA Sources + // 2. Create NOOP Sources List sources = new ArrayList<>(); - sources.addAll(UebTopicFactories.getSourceFactory().build(properties)); sources.addAll(KafkaTopicFactories.getSourceFactory().build(properties)); sources.addAll(NoopTopicFactories.getSourceFactory().build(properties)); @@ -141,9 +133,6 @@ class TopicEndpointProxy implements TopicEndpoint { for (TopicParameters param : paramList) { switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) { - case UEB: - sinks.add(UebTopicFactories.getSinkFactory().build(param)); - break; case KAFKA: sinks.add(KafkaTopicFactories.getSinkFactory().build(param)); break; @@ -164,13 +153,11 @@ class TopicEndpointProxy implements TopicEndpoint { @Override public List addTopicSinks(Properties properties) { - // 1. Create UEB Sinks - // 2. Create KAFKA Sinks - // 3. Create NOOP Sinks + // 1. Create KAFKA Sinks + // 2. Create NOOP Sinks final List sinks = new ArrayList<>(); - sinks.addAll(UebTopicFactories.getSinkFactory().build(properties)); sinks.addAll(KafkaTopicFactories.getSinkFactory().build(properties)); sinks.addAll(NoopTopicFactories.getSinkFactory().build(properties)); @@ -190,7 +177,6 @@ class TopicEndpointProxy implements TopicEndpoint { final List sources = new ArrayList<>(); - sources.addAll(UebTopicFactories.getSourceFactory().inventory()); sources.addAll(KafkaTopicFactories.getSourceFactory().inventory()); sources.addAll(NoopTopicFactories.getSourceFactory().inventory()); @@ -207,12 +193,6 @@ class TopicEndpointProxy implements TopicEndpoint { final List sources = new ArrayList<>(); topicNames.forEach(topic -> { - try { - sources.add(Objects.requireNonNull(this.getUebTopicSource(topic))); - } catch (final Exception e) { - logger.debug("No UEB source for topic: {}", topic, e); - } - try { sources.add(Objects.requireNonNull(this.getKafkaTopicSource(topic))); } catch (final Exception e) { @@ -234,7 +214,6 @@ class TopicEndpointProxy implements TopicEndpoint { final List sinks = new ArrayList<>(); - sinks.addAll(UebTopicFactories.getSinkFactory().inventory()); sinks.addAll(KafkaTopicFactories.getSinkFactory().inventory()); sinks.addAll(NoopTopicFactories.getSinkFactory().inventory()); @@ -250,12 +229,6 @@ class TopicEndpointProxy implements TopicEndpoint { final List sinks = new ArrayList<>(); for (final String topic : topicNames) { - try { - sinks.add(Objects.requireNonNull(this.getUebTopicSink(topic))); - } catch (final Exception e) { - logger.debug("No UEB sink for topic: {}", topic, e); - } - try { sinks.add(Objects.requireNonNull(this.getKafkaTopicSink(topic))); } catch (final Exception e) { @@ -279,12 +252,6 @@ class TopicEndpointProxy implements TopicEndpoint { final List sinks = new ArrayList<>(); - try { - sinks.add(this.getUebTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - try { sinks.add(this.getKafkaTopicSink(topicName)); } catch (final Exception e) { @@ -300,12 +267,6 @@ class TopicEndpointProxy implements TopicEndpoint { return sinks; } - @GsonJsonIgnore - @Override - public List getUebTopicSources() { - return UebTopicFactories.getSourceFactory().inventory(); - } - @GsonJsonIgnore @Override public List getKafkaTopicSources() { @@ -318,12 +279,6 @@ class TopicEndpointProxy implements TopicEndpoint { return NoopTopicFactories.getSourceFactory().inventory(); } - @GsonJsonIgnore - @Override - public List getUebTopicSinks() { - return UebTopicFactories.getSinkFactory().inventory(); - } - @Override @GsonJsonIgnore public List getKafkaTopicSinks() { @@ -411,9 +366,6 @@ class TopicEndpointProxy implements TopicEndpoint { public void shutdown() { this.stop(); - UebTopicFactories.getSourceFactory().destroy(); - UebTopicFactories.getSinkFactory().destroy(); - KafkaTopicFactories.getSourceFactory().destroy(); KafkaTopicFactories.getSinkFactory().destroy(); @@ -478,7 +430,6 @@ class TopicEndpointProxy implements TopicEndpoint { } return switch (commType) { - case UEB -> this.getUebTopicSource(topicName); case KAFKA -> this.getKafkaTopicSource(topicName); case NOOP -> this.getNoopTopicSource(topicName); default -> throw new UnsupportedOperationException("Unsupported " + commType.name()); @@ -496,23 +447,12 @@ class TopicEndpointProxy implements TopicEndpoint { } return switch (commType) { - case UEB -> this.getUebTopicSink(topicName); case KAFKA -> this.getKafkaTopicSink(topicName); case NOOP -> this.getNoopTopicSink(topicName); default -> throw new UnsupportedOperationException("Unsupported " + commType.name()); }; } - @Override - public UebTopicSource getUebTopicSource(String topicName) { - return UebTopicFactories.getSourceFactory().get(topicName); - } - - @Override - public UebTopicSink getUebTopicSink(String topicName) { - return UebTopicFactories.getSinkFactory().get(topicName); - } - @Override public KafkaTopicSource getKafkaTopicSource(String topicName) { return KafkaTopicFactories.getSourceFactory().get(topicName); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java deleted file mode 100644 index b04fc078..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import com.google.re2j.Pattern; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; -import org.apache.commons.lang3.StringUtils; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; -import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.onap.policy.common.endpoints.utils.PropertyUtils; -import org.onap.policy.common.endpoints.utils.UebPropertyUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Factory of UEB Reader Topics indexed by topic name. - */ -class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { - private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*"); - private static final String MISSING_TOPIC = "A topic must be provided"; - - /** - * Logger. - */ - private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class); - - /** - * UEB Topic Name Index. - */ - protected HashMap uebTopicSinks = new HashMap<>(); - - @Override - public UebTopicSink build(BusTopicParams busTopicParams) { - - if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) { - throw new IllegalArgumentException("UEB Server(s) must be provided"); - } - - if (StringUtils.isBlank(busTopicParams.getTopic())) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (uebTopicSinks.containsKey(busTopicParams.getTopic())) { - return uebTopicSinks.get(busTopicParams.getTopic()); - } - - UebTopicSink uebTopicWriter = makeSink(busTopicParams); - - if (busTopicParams.isManaged()) { - uebTopicSinks.put(busTopicParams.getTopic(), uebTopicWriter); - } - - return uebTopicWriter; - } - } - - - @Override - public UebTopicSink build(List servers, String topic) { - return this.build(BusTopicParams.builder() - .servers(servers) - .topic(topic) - .managed(true) - .useHttps(false) - .allowSelfSignedCerts(false) - .build()); - } - - - @Override - public List build(Properties properties) { - - String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS); - if (StringUtils.isBlank(writeTopics)) { - logger.info("{}: no topic for UEB Sink", this); - return new ArrayList<>(); - } - - List newUebTopicSinks = new ArrayList<>(); - synchronized (this) { - for (String topic : COMMA_SPACE_PAT.split(writeTopics)) { - addTopic(newUebTopicSinks, topic, properties); - } - return newUebTopicSinks; - } - } - - private void addTopic(List newUebTopicSinks, String topic, Properties properties) { - if (this.uebTopicSinks.containsKey(topic)) { - newUebTopicSinks.add(this.uebTopicSinks.get(topic)); - return; - } - - String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic; - - var props = new PropertyUtils(properties, topicPrefix, - (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); - - String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - if (StringUtils.isBlank(servers)) { - logger.error("{}: no UEB servers configured for sink {}", this, topic); - return; - } - - UebTopicSink uebTopicWriter = this.build(UebPropertyUtils.makeBuilder(props, topic, servers) - .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null)) - .build()); - newUebTopicSinks.add(uebTopicWriter); - } - - @Override - public void destroy(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - UebTopicSink uebTopicWriter; - synchronized (this) { - if (!uebTopicSinks.containsKey(topic)) { - return; - } - - uebTopicWriter = uebTopicSinks.remove(topic); - } - - uebTopicWriter.shutdown(); - } - - @Override - public void destroy() { - List writers = this.inventory(); - for (UebTopicSink writer : writers) { - writer.shutdown(); - } - - synchronized (this) { - this.uebTopicSinks.clear(); - } - } - - @Override - public UebTopicSink get(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (uebTopicSinks.containsKey(topic)) { - return uebTopicSinks.get(topic); - } else { - throw new IllegalStateException("UebTopicSink for " + topic + " not found"); - } - } - } - - @Override - public synchronized List inventory() { - return new ArrayList<>(this.uebTopicSinks.values()); - } - - /** - * Makes a new sink. - * - * @param busTopicParams parameters to use to configure the sink - * @return a new sink - */ - protected UebTopicSink makeSink(BusTopicParams busTopicParams) { - return new InlineUebTopicSink(busTopicParams); - } - - - @Override - public String toString() { - return "IndexedUebTopicSinkFactory " + uebTopicSinks.keySet(); - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java deleted file mode 100644 index 09500978..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import com.google.re2j.Pattern; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; -import org.apache.commons.lang3.StringUtils; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; -import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.onap.policy.common.endpoints.utils.PropertyUtils; -import org.onap.policy.common.endpoints.utils.UebPropertyUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Factory of UEB Source Topics indexed by topic name. - */ -class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { - private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*"); - private static final String MISSING_TOPIC = "A topic must be provided"; - - /** - * Logger. - */ - private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class); - - /** - * UEB Topic Name Index. - */ - protected HashMap uebTopicSources = new HashMap<>(); - - @Override - public UebTopicSource build(BusTopicParams busTopicParams) { - if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) { - throw new IllegalArgumentException("UEB Server(s) must be provided"); - } - - if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (uebTopicSources.containsKey(busTopicParams.getTopic())) { - return uebTopicSources.get(busTopicParams.getTopic()); - } - - var uebTopicSource = makeSource(busTopicParams); - - if (busTopicParams.isManaged()) { - uebTopicSources.put(busTopicParams.getTopic(), uebTopicSource); - } - - return uebTopicSource; - } - } - - @Override - public List build(Properties properties) { - - String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS); - if (StringUtils.isBlank(readTopics)) { - logger.info("{}: no topic for UEB Source", this); - return new ArrayList<>(); - } - - List newUebTopicSources = new ArrayList<>(); - synchronized (this) { - for (String topic : COMMA_SPACE_PAT.split(readTopics)) { - addTopic(newUebTopicSources, topic, properties); - } - } - return newUebTopicSources; - } - - @Override - public UebTopicSource build(List servers, String topic, String apiKey, String apiSecret) { - - return this.build(BusTopicParams.builder() - .servers(servers) - .topic(topic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH) - .fetchLimit(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH) - .managed(true) - .useHttps(false) - .allowSelfSignedCerts(true).build()); - } - - @Override - public UebTopicSource build(List servers, String topic) { - return this.build(servers, topic, null, null); - } - - private void addTopic(List newUebTopicSources, String topic, Properties properties) { - if (this.uebTopicSources.containsKey(topic)) { - newUebTopicSources.add(this.uebTopicSources.get(topic)); - return; - } - - String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic; - - var props = new PropertyUtils(properties, topicPrefix, - (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); - - String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - if (StringUtils.isBlank(servers)) { - logger.error("{}: no UEB servers configured for sink {}", this, topic); - return; - } - - var uebTopicSource = this.build(UebPropertyUtils.makeBuilder(props, topic, servers) - .consumerGroup(props.getString( - PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null)) - .consumerInstance(props.getString( - PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null)) - .fetchTimeout(props.getInteger( - PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX, - PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH)) - .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, - PolicyEndPointProperties.DEFAULT_LIMIT_FETCH)) - .build()); - - newUebTopicSources.add(uebTopicSource); - } - - /** - * Makes a new source. - * - * @param busTopicParams parameters to use to configure the source - * @return a new source - */ - protected UebTopicSource makeSource(BusTopicParams busTopicParams) { - return new SingleThreadedUebTopicSource(busTopicParams); - } - - @Override - public void destroy(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - UebTopicSource uebTopicSource; - - synchronized (this) { - if (!uebTopicSources.containsKey(topic)) { - return; - } - - uebTopicSource = uebTopicSources.remove(topic); - } - - uebTopicSource.shutdown(); - } - - @Override - public void destroy() { - List readers = this.inventory(); - for (UebTopicSource reader : readers) { - reader.shutdown(); - } - - synchronized (this) { - this.uebTopicSources.clear(); - } - } - - @Override - public UebTopicSource get(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (uebTopicSources.containsKey(topic)) { - return uebTopicSources.get(topic); - } else { - throw new IllegalStateException("UebTopiceSource for " + topic + " not found"); - } - } - } - - @Override - public synchronized List inventory() { - return new ArrayList<>(this.uebTopicSources.values()); - } - - @Override - public String toString() { - return "IndexedUebTopicSourceFactory " + uebTopicSources.keySet(); - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java deleted file mode 100644 index 721f2135..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import lombok.AccessLevel; -import lombok.Getter; -import lombok.NoArgsConstructor; - -@NoArgsConstructor(access = AccessLevel.PRIVATE) -public class UebTopicFactories { - - /** - * Factory for instantiation and management of sinks. - */ - @Getter - private static final UebTopicSinkFactory sinkFactory = new IndexedUebTopicSinkFactory(); - - /** - * Factory for instantiation and management of sources. - */ - @Getter - private static final UebTopicSourceFactory sourceFactory = new IndexedUebTopicSourceFactory(); -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java deleted file mode 100644 index acfef6da..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java +++ /dev/null @@ -1,28 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -/** - * Topic Writer over UEB Infrastructure. - */ -public interface UebTopicSink extends BusTopicSink { - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java deleted file mode 100644 index 0cf095fd..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import java.util.List; -import java.util.Properties; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; - -/** - * UEB Topic Sink Factory. - */ -public interface UebTopicSinkFactory { - - /** - * Instantiates a new UEB Topic Writer. - * - * @param busTopicParams parameters object - * @return an UEB Topic Sink - */ - UebTopicSink build(BusTopicParams busTopicParams); - - /** - * Creates an UEB Topic Writer based on properties files. - * - * @param properties Properties containing initialization values - * - * @return an UEB Topic Writer - * @throws IllegalArgumentException if invalid parameters are present - */ - List build(Properties properties); - - /** - * Instantiates a new UEB Topic Writer. - * - * @param servers list of servers - * @param topic topic name - * - * @return an UEB Topic Writer - * @throws IllegalArgumentException if invalid parameters are present - */ - UebTopicSink build(List servers, String topic); - - /** - * Destroys an UEB Topic Writer based on a topic. - * - * @param topic topic name - * @throws IllegalArgumentException if invalid parameters are present - */ - void destroy(String topic); - - /** - * Destroys all UEB Topic Writers. - */ - void destroy(); - - /** - * gets an UEB Topic Writer based on topic name. - * - * @param topic the topic name - * - * @return an UEB Topic Writer with topic name - * @throws IllegalArgumentException if an invalid topic is provided - * @throws IllegalStateException if the UEB Topic Reader is an incorrect state - */ - UebTopicSink get(String topic); - - /** - * Provides a snapshot of the UEB Topic Writers. - * - * @return a list of the UEB Topic Writers - */ - List inventory(); -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java deleted file mode 100644 index 56534309..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java +++ /dev/null @@ -1,29 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -/** - * Topic Source for UEB Communication Infrastructure. - * - */ -public interface UebTopicSource extends BusTopicSource { - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java deleted file mode 100644 index beacee3b..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import java.util.List; -import java.util.Properties; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; - -/** - * UEB Topic Source Factory. - */ -public interface UebTopicSourceFactory { - - /** - * Creates an UEB Topic Source based on properties files. - * - * @param properties Properties containing initialization values - * - * @return an UEB Topic Source - * @throws IllegalArgumentException if invalid parameters are present - */ - List build(Properties properties); - - /** - * Instantiates a new UEB Topic Source. - * - * @param busTopicParams parameters object - * @return an UEB Topic Source - */ - UebTopicSource build(BusTopicParams busTopicParams); - - /** - * Instantiates a new UEB Topic Source. - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * - * @return an UEB Topic Source - * @throws IllegalArgumentException if invalid parameters are present - */ - UebTopicSource build(List servers, String topic, String apiKey, String apiSecret); - - /** - * Instantiates a new UEB Topic Source. - * - * @param servers list of servers - * @param topic topic name - * - * @return an UEB Topic Source - * @throws IllegalArgumentException if invalid parameters are present - */ - UebTopicSource build(List servers, String topic); - - /** - * Destroys an UEB Topic Source based on a topic. - * - * @param topic topic name - * @throws IllegalArgumentException if invalid parameters are present - */ - void destroy(String topic); - - /** - * Destroys all UEB Topic Sources. - */ - void destroy(); - - /** - * Gets an UEB Topic Source based on topic name. - * - * @param topic the topic name - * @return an UEB Topic Source with topic name - * @throws IllegalArgumentException if an invalid topic is provided - * @throws IllegalStateException if the UEB Topic Source is an incorrect state - */ - UebTopicSource get(String topic); - - /** - * Provides a snapshot of the UEB Topic Sources. - * - * @return a list of the UEB Topic Sources - */ - List inventory(); -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 3c57d1ba..b46c2715 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -23,9 +23,6 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -import com.att.nsa.cambria.client.CambriaConsumer; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.TraceFlags; @@ -33,9 +30,7 @@ import io.opentelemetry.api.trace.TraceState; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor; import java.io.IOException; -import java.net.MalformedURLException; import java.nio.charset.StandardCharsets; -import java.security.GeneralSecurityException; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -137,98 +132,6 @@ public interface BusConsumer { } } - /** - * Cambria based consumer. - */ - public static class CambriaConsumerWrapper extends FetchingBusConsumer { - - /** - * logger. - */ - private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class); - - /** - * Used to build the consumer. - */ - private final ConsumerBuilder builder; - - /** - * Cambria client. - */ - private final CambriaConsumer consumer; - - /** - * Cambria Consumer Wrapper. - * BusTopicParam object contains the following parameters - * servers - messaging bus hosts. - * topic - topic for messages - * apiKey - API Key - * apiSecret - API Secret - * consumerGroup - Consumer Group - * consumerInstance - Consumer Instance - * fetchTimeout - Fetch Timeout - * fetchLimit - Fetch Limit - * - * @param busTopicParams - The parameters for the bus topic - */ - public CambriaConsumerWrapper(BusTopicParams busTopicParams) { - super(busTopicParams); - - this.builder = new CambriaClientBuilders.ConsumerBuilder(); - - builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance()) - .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic()) - .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit()); - - // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable) - builder.withSocketTimeout(fetchTimeout + 30000); - - if (busTopicParams.isUseHttps()) { - builder.usingHttps(); - - if (busTopicParams.isAllowSelfSignedCerts()) { - builder.allowSelfSignedCertificates(); - } - } - - if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) { - builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret()); - } - - if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) { - builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword()); - } - - try { - this.consumer = builder.build(); - } catch (MalformedURLException | GeneralSecurityException e) { - throw new IllegalArgumentException(e); - } - } - - @Override - public Iterable fetch() throws IOException { - try { - return this.consumer.fetch(); - } catch (final IOException e) { //NOSONAR - logger.error("{}: cannot fetch because of {}", this, e.getMessage()); - sleepAfterFetchFailure(); - throw e; - } - } - - @Override - public void close() { - super.close(); - this.consumer.close(); - } - - @Override - public String toString() { - return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]"; - } - } - /** * Kafka based consumer. */ diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index e2adde0d..1b57e48e 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -23,19 +23,13 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; -import com.att.nsa.apiClient.http.HttpClient.ConnectionType; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaClientBuilders; import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; import java.util.Properties; import java.util.UUID; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.onap.policy.common.gson.annotation.GsonJsonIgnore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,89 +54,6 @@ public interface BusPublisher { */ void close(); - /** - * Cambria based library publisher. - */ - class CambriaPublisherWrapper implements BusPublisher { - - private static final Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class); - - /** - * The actual Cambria publisher. - */ - @GsonJsonIgnore - protected CambriaBatchingPublisher publisher; - - /** - * Constructor. - * - * @param busTopicParams topic parameters - */ - public CambriaPublisherWrapper(BusTopicParams busTopicParams) { - - var builder = new CambriaClientBuilders.PublisherBuilder(); - - builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic()); - - // Set read timeout to 30 seconds (TBD: this should be configurable) - builder.withSocketTimeout(30000); - - if (busTopicParams.isUseHttps()) { - if (busTopicParams.isAllowSelfSignedCerts()) { - builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION); - } else { - builder.withConnectionType(ConnectionType.HTTPS); - } - } - - if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) { - builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret()); - } - - if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) { - builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword()); - } - - try { - this.publisher = builder.build(); - } catch (MalformedURLException | GeneralSecurityException e) { - throw new IllegalArgumentException(e); - } - } - - @Override - public boolean send(String partitionId, String message) { - if (message == null) { - throw new IllegalArgumentException(NO_MESSAGE_PROVIDED); - } - - try { - this.publisher.send(partitionId, message); - } catch (Exception e) { - logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e); - return false; - } - return true; - } - - @Override - public void close() { - logger.info(LOG_CLOSE, this); - - try { - this.publisher.close(); - } catch (Exception e) { - logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e); - } - } - - @Override - public String toString() { - return "CambriaPublisherWrapper []"; - } - - } - /** * Kafka based library publisher. */ diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java deleted file mode 100644 index 896cb3bb..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved. - * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal; - -import org.onap.policy.common.endpoints.event.comm.Topic; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This implementation publishes events for the associated UEB topic, inline with the calling - * thread. - */ -public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSink { - - /** - * Logger. - */ - private static Logger logger = LoggerFactory.getLogger(InlineUebTopicSink.class); - - /** - * Argument-based UEB Topic Writer instantiation. BusTopicParams contains below mentioned - * attributes. - * - *

servers list of UEB servers available for publishing - * topic the topic to publish to - * apiKey the api key (optional) - * apiSecret the api secret (optional) - * partitionId the partition key (optional, autogenerated if not provided) - * useHttps does connection use HTTPS? - * allowTracing is tracing allowed? - * allowSelfSignedCerts are self-signed certificates allow - * @param busTopicParams contains attributes needed - * @throws IllegalArgumentException if invalid arguments are detected - */ - public InlineUebTopicSink(BusTopicParams busTopicParams) { - super(busTopicParams); - } - - /** - * Instantiation of internal resources. - */ - @Override - public void init() { - - this.publisher = new BusPublisher.CambriaPublisherWrapper(BusTopicParams.builder() - .servers(this.servers) - .topic(this.effectiveTopic) - .apiKey(this.apiKey) - .apiSecret(this.apiSecret) - .useHttps(this.useHttps) - .allowTracing(this.allowTracing) - .allowSelfSignedCerts(this.allowSelfSignedCerts) - .build()); - logger.info("{}: UEB SINK created", this); - } - - @Override - public String toString() { - return "InlineUebTopicSink [getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()=" - + super.toString() + "]"; - } - - @Override - public CommInfrastructure getTopicCommInfrastructure() { - return Topic.CommInfrastructure.UEB; - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java deleted file mode 100644 index ead04594..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal; - -import org.onap.policy.common.endpoints.event.comm.Topic; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; - -/** - * This topic source implementation specializes in reading messages over an UEB Bus topic source and - * notifying its listeners. - */ -public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource implements UebTopicSource { - - /** - * Constructor. - * - * @param busTopicParams Parameters object containing all the required inputs - * @throws IllegalArgumentException An invalid parameter passed in - */ - public SingleThreadedUebTopicSource(BusTopicParams busTopicParams) { - super(busTopicParams); - this.init(); - } - - /** - * Initialize the Cambria client. - */ - @Override - public void init() { - this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder() - .servers(this.servers) - .topic(this.effectiveTopic) - .apiKey(this.apiKey) - .apiSecret(this.apiSecret) - .consumerGroup(this.consumerGroup) - .consumerInstance(this.consumerInstance) - .fetchTimeout(this.fetchTimeout) - .fetchLimit(this.fetchLimit) - .useHttps(this.useHttps) - .allowTracing(this.allowTracing) - .allowSelfSignedCerts(this.allowSelfSignedCerts).build()); - } - - @Override - public CommInfrastructure getTopicCommInfrastructure() { - return Topic.CommInfrastructure.UEB; - } - - @Override - public String toString() { - return "SingleThreadedUebTopicSource [getTopicCommInfrastructure()=" + getTopicCommInfrastructure() - + ", toString()=" + super.toString() + "]"; - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java index 5f49ea34..0ccc8a75 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java @@ -3,7 +3,7 @@ * ONAP PAP * ================================================================================ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019, 2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; /** * Client for sending messages to a Topic using TopicSink. */ +@Getter public class TopicSinkClient { private static final Logger logger = LoggerFactory.getLogger(TopicSinkClient.class); @@ -46,7 +47,6 @@ public class TopicSinkClient { /** * Where messages are published. */ - @Getter private final TopicSink sink; /** -- cgit 1.2.3-korg