diff options
Diffstat (limited to 'policy-endpoints')
38 files changed, 137 insertions, 2352 deletions
diff --git a/policy-endpoints/pom.xml b/policy-endpoints/pom.xml index c6dd4b6e..9bc0c883 100644 --- a/policy-endpoints/pom.xml +++ b/policy-endpoints/pom.xml @@ -36,10 +36,6 @@ <name>policy-endpoints</name> <description>Endpoints</description> - <properties> - <cambria.version>1.2.1-oss</cambria.version> - </properties> - <dependencyManagement> <dependencies> <dependency> @@ -67,6 +63,7 @@ <groupId>org.onap.policy.common</groupId> <artifactId>utils</artifactId> <version>${project.version}</version> + <scope>provided</scope> </dependency> <dependency> <groupId>org.onap.policy.common</groupId> @@ -80,48 +77,18 @@ <scope>test</scope> </dependency> <dependency> - <groupId>com.att.nsa</groupId> - <artifactId>cambriaClient</artifactId> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-annotations</artifactId> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.module</groupId> - <artifactId>jackson-module-jakarta-xmlbind-annotations</artifactId> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.dataformat</groupId> - <artifactId>jackson-dataformat-yaml</artifactId> + <scope>provided</scope> </dependency> <dependency> <groupId>org.glassfish.jaxb</groupId> <artifactId>jaxb-runtime</artifactId> + <scope>provided</scope> </dependency> <dependency> <groupId>org.glassfish.jersey.containers</groupId> - <artifactId>jersey-container-servlet</artifactId> - </dependency> - <dependency> - <groupId>org.glassfish.jersey.media</groupId> - <artifactId>jersey-media-json-jackson</artifactId> - <version>${version.jersey}</version> - <scope>compile</scope> + <artifactId>jersey-container-servlet-core</artifactId> </dependency> <dependency> <groupId>org.glassfish.jersey.core</groupId> @@ -132,22 +99,7 @@ <groupId>org.glassfish.jersey.inject</groupId> <artifactId>jersey-hk2</artifactId> <version>${version.jersey}</version> - </dependency> - <dependency> - <groupId>org.apache.httpcomponents.client5</groupId> - <artifactId>httpclient5</artifactId> - </dependency> - <dependency> - <groupId>org.apache.httpcomponents.core5</groupId> - <artifactId>httpcore5</artifactId> - </dependency> - <dependency> - <groupId>jakarta.activation</groupId> - <artifactId>jakarta.activation-api</artifactId> - </dependency> - <dependency> - <groupId>jakarta.inject</groupId> - <artifactId>jakarta.inject-api</artifactId> + <scope>provided</scope> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> @@ -158,32 +110,66 @@ <artifactId>jetty-security</artifactId> </dependency> <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + </dependency> + <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> + <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-collections4</artifactId> </dependency> <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-jexl3</artifactId> + <scope>provided</scope> + </dependency> + <dependency> <groupId>io.swagger.core.v3</groupId> <artifactId>swagger-annotations</artifactId> + <scope>test</scope> </dependency> <dependency> <groupId>io.opentelemetry.instrumentation</groupId> <artifactId>opentelemetry-kafka-clients-2.6</artifactId> </dependency> <dependency> - <groupId>io.opentelemetry</groupId> - <artifactId>opentelemetry-exporter-otlp</artifactId> + <groupId>io.swagger.core.v3</groupId> + <artifactId>swagger-jaxrs2-jakarta</artifactId> + <scope>provided</scope> </dependency> <dependency> - <groupId>io.opentelemetry</groupId> - <artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId> + <groupId>io.swagger.core.v3</groupId> + <artifactId>swagger-jaxrs2-servlet-initializer-v2-jakarta</artifactId> + <scope>provided</scope> </dependency> <dependency> - <groupId>com.sun.xml.bind</groupId> - <artifactId>jaxb-impl</artifactId> + <groupId>io.prometheus</groupId> + <artifactId>simpleclient_hotspot</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>io.prometheus</groupId> + <artifactId>simpleclient_servlet_jakarta</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-test</artifactId> + <scope>test</scope> </dependency> </dependencies> </project> diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java index d09e7353..ce8e2387 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java @@ -37,10 +37,6 @@ public interface Topic extends TopicRegisterable, Startable, Lockable { */ enum CommInfrastructure { /** - * UEB Communication Infrastructure. - */ - UEB, - /** * KAFKA Communication Infrastructure. */ KAFKA, diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java index 8cae5bd1..bf261def 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java @@ -29,8 +29,6 @@ import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; import org.onap.policy.common.endpoints.parameters.TopicParameters; @@ -129,18 +127,6 @@ public interface TopicEndpoint extends Startable, Lockable { TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName); /** - * Get the UEB Topic Source for the given topic name. - * - * @param topicName the topic name - * - * @return the UEB Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - UebTopicSource getUebTopicSource(String topicName); - - /** * Get the Noop Source for the given topic name. * * @param topicName the topic name. @@ -198,18 +184,6 @@ public interface TopicEndpoint extends Startable, Lockable { TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName); /** - * Get the UEB Topic Source for the given topic name. - * - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - UebTopicSink getUebTopicSink(String topicName); - - /** * Get the no-op Topic Sink for the given topic name. * * @param topicName the topic name @@ -234,13 +208,6 @@ public interface TopicEndpoint extends Startable, Lockable { KafkaTopicSink getKafkaTopicSink(String topicName); /** - * Gets only the UEB Topic Sources. - * - * @return the UEB Topic Source List - */ - List<UebTopicSource> getUebTopicSources(); - - /** * Gets only the KAFKA Topic Sources. * * @return the KAFKA Topic Source List @@ -255,13 +222,6 @@ public interface TopicEndpoint extends Startable, Lockable { List<NoopTopicSource> getNoopTopicSources(); /** - * Gets only the UEB Topic Sinks. - * - * @return the UEB Topic Sink List - */ - List<UebTopicSink> getUebTopicSinks(); - - /** * Gets only the KAFKA Topic Sinks. * * @return the KAFKA Topic Sinks List diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java index 4ec8eb54..d9e55ddd 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java @@ -34,9 +34,6 @@ import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicFactories; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; import org.onap.policy.common.endpoints.parameters.TopicParameters; import org.onap.policy.common.gson.annotation.GsonJsonIgnore; @@ -90,9 +87,6 @@ class TopicEndpointProxy implements TopicEndpoint { for (TopicParameters param : paramList) { switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) { - case UEB: - sources.add(UebTopicFactories.getSourceFactory().build(param)); - break; case KAFKA: sources.add(KafkaTopicFactories.getSourceFactory().build(param)); break; @@ -114,13 +108,11 @@ class TopicEndpointProxy implements TopicEndpoint { @Override public List<TopicSource> addTopicSources(Properties properties) { - // 1. Create UEB Sources - // 2. Create KAFKA Sources - // 3. Create NOOP Sources + // 1. Create KAFKA Sources + // 2. Create NOOP Sources List<TopicSource> sources = new ArrayList<>(); - sources.addAll(UebTopicFactories.getSourceFactory().build(properties)); sources.addAll(KafkaTopicFactories.getSourceFactory().build(properties)); sources.addAll(NoopTopicFactories.getSourceFactory().build(properties)); @@ -141,9 +133,6 @@ class TopicEndpointProxy implements TopicEndpoint { for (TopicParameters param : paramList) { switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) { - case UEB: - sinks.add(UebTopicFactories.getSinkFactory().build(param)); - break; case KAFKA: sinks.add(KafkaTopicFactories.getSinkFactory().build(param)); break; @@ -164,13 +153,11 @@ class TopicEndpointProxy implements TopicEndpoint { @Override public List<TopicSink> addTopicSinks(Properties properties) { - // 1. Create UEB Sinks - // 2. Create KAFKA Sinks - // 3. Create NOOP Sinks + // 1. Create KAFKA Sinks + // 2. Create NOOP Sinks final List<TopicSink> sinks = new ArrayList<>(); - sinks.addAll(UebTopicFactories.getSinkFactory().build(properties)); sinks.addAll(KafkaTopicFactories.getSinkFactory().build(properties)); sinks.addAll(NoopTopicFactories.getSinkFactory().build(properties)); @@ -190,7 +177,6 @@ class TopicEndpointProxy implements TopicEndpoint { final List<TopicSource> sources = new ArrayList<>(); - sources.addAll(UebTopicFactories.getSourceFactory().inventory()); sources.addAll(KafkaTopicFactories.getSourceFactory().inventory()); sources.addAll(NoopTopicFactories.getSourceFactory().inventory()); @@ -208,12 +194,6 @@ class TopicEndpointProxy implements TopicEndpoint { topicNames.forEach(topic -> { try { - sources.add(Objects.requireNonNull(this.getUebTopicSource(topic))); - } catch (final Exception e) { - logger.debug("No UEB source for topic: {}", topic, e); - } - - try { sources.add(Objects.requireNonNull(this.getKafkaTopicSource(topic))); } catch (final Exception e) { logger.debug("No KAFKA source for topic: {}", topic, e); @@ -234,7 +214,6 @@ class TopicEndpointProxy implements TopicEndpoint { final List<TopicSink> sinks = new ArrayList<>(); - sinks.addAll(UebTopicFactories.getSinkFactory().inventory()); sinks.addAll(KafkaTopicFactories.getSinkFactory().inventory()); sinks.addAll(NoopTopicFactories.getSinkFactory().inventory()); @@ -251,12 +230,6 @@ class TopicEndpointProxy implements TopicEndpoint { final List<TopicSink> sinks = new ArrayList<>(); for (final String topic : topicNames) { try { - sinks.add(Objects.requireNonNull(this.getUebTopicSink(topic))); - } catch (final Exception e) { - logger.debug("No UEB sink for topic: {}", topic, e); - } - - try { sinks.add(Objects.requireNonNull(this.getKafkaTopicSink(topic))); } catch (final Exception e) { logger.debug("No KAFKA sink for topic: {}", topic, e); @@ -280,12 +253,6 @@ class TopicEndpointProxy implements TopicEndpoint { final List<TopicSink> sinks = new ArrayList<>(); try { - sinks.add(this.getUebTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - try { sinks.add(this.getKafkaTopicSink(topicName)); } catch (final Exception e) { logNoSink(topicName, e); @@ -302,12 +269,6 @@ class TopicEndpointProxy implements TopicEndpoint { @GsonJsonIgnore @Override - public List<UebTopicSource> getUebTopicSources() { - return UebTopicFactories.getSourceFactory().inventory(); - } - - @GsonJsonIgnore - @Override public List<KafkaTopicSource> getKafkaTopicSources() { return KafkaTopicFactories.getSourceFactory().inventory(); } @@ -318,12 +279,6 @@ class TopicEndpointProxy implements TopicEndpoint { return NoopTopicFactories.getSourceFactory().inventory(); } - @GsonJsonIgnore - @Override - public List<UebTopicSink> getUebTopicSinks() { - return UebTopicFactories.getSinkFactory().inventory(); - } - @Override @GsonJsonIgnore public List<KafkaTopicSink> getKafkaTopicSinks() { @@ -411,9 +366,6 @@ class TopicEndpointProxy implements TopicEndpoint { public void shutdown() { this.stop(); - UebTopicFactories.getSourceFactory().destroy(); - UebTopicFactories.getSinkFactory().destroy(); - KafkaTopicFactories.getSourceFactory().destroy(); KafkaTopicFactories.getSinkFactory().destroy(); @@ -478,7 +430,6 @@ class TopicEndpointProxy implements TopicEndpoint { } return switch (commType) { - case UEB -> this.getUebTopicSource(topicName); case KAFKA -> this.getKafkaTopicSource(topicName); case NOOP -> this.getNoopTopicSource(topicName); default -> throw new UnsupportedOperationException("Unsupported " + commType.name()); @@ -496,7 +447,6 @@ class TopicEndpointProxy implements TopicEndpoint { } return switch (commType) { - case UEB -> this.getUebTopicSink(topicName); case KAFKA -> this.getKafkaTopicSink(topicName); case NOOP -> this.getNoopTopicSink(topicName); default -> throw new UnsupportedOperationException("Unsupported " + commType.name()); @@ -504,16 +454,6 @@ class TopicEndpointProxy implements TopicEndpoint { } @Override - public UebTopicSource getUebTopicSource(String topicName) { - return UebTopicFactories.getSourceFactory().get(topicName); - } - - @Override - public UebTopicSink getUebTopicSink(String topicName) { - return UebTopicFactories.getSinkFactory().get(topicName); - } - - @Override public KafkaTopicSource getKafkaTopicSource(String topicName) { return KafkaTopicFactories.getSourceFactory().get(topicName); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java deleted file mode 100644 index b04fc078..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import com.google.re2j.Pattern; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; -import org.apache.commons.lang3.StringUtils; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; -import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.onap.policy.common.endpoints.utils.PropertyUtils; -import org.onap.policy.common.endpoints.utils.UebPropertyUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Factory of UEB Reader Topics indexed by topic name. - */ -class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { - private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*"); - private static final String MISSING_TOPIC = "A topic must be provided"; - - /** - * Logger. - */ - private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class); - - /** - * UEB Topic Name Index. - */ - protected HashMap<String, UebTopicSink> uebTopicSinks = new HashMap<>(); - - @Override - public UebTopicSink build(BusTopicParams busTopicParams) { - - if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) { - throw new IllegalArgumentException("UEB Server(s) must be provided"); - } - - if (StringUtils.isBlank(busTopicParams.getTopic())) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (uebTopicSinks.containsKey(busTopicParams.getTopic())) { - return uebTopicSinks.get(busTopicParams.getTopic()); - } - - UebTopicSink uebTopicWriter = makeSink(busTopicParams); - - if (busTopicParams.isManaged()) { - uebTopicSinks.put(busTopicParams.getTopic(), uebTopicWriter); - } - - return uebTopicWriter; - } - } - - - @Override - public UebTopicSink build(List<String> servers, String topic) { - return this.build(BusTopicParams.builder() - .servers(servers) - .topic(topic) - .managed(true) - .useHttps(false) - .allowSelfSignedCerts(false) - .build()); - } - - - @Override - public List<UebTopicSink> build(Properties properties) { - - String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS); - if (StringUtils.isBlank(writeTopics)) { - logger.info("{}: no topic for UEB Sink", this); - return new ArrayList<>(); - } - - List<UebTopicSink> newUebTopicSinks = new ArrayList<>(); - synchronized (this) { - for (String topic : COMMA_SPACE_PAT.split(writeTopics)) { - addTopic(newUebTopicSinks, topic, properties); - } - return newUebTopicSinks; - } - } - - private void addTopic(List<UebTopicSink> newUebTopicSinks, String topic, Properties properties) { - if (this.uebTopicSinks.containsKey(topic)) { - newUebTopicSinks.add(this.uebTopicSinks.get(topic)); - return; - } - - String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic; - - var props = new PropertyUtils(properties, topicPrefix, - (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); - - String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - if (StringUtils.isBlank(servers)) { - logger.error("{}: no UEB servers configured for sink {}", this, topic); - return; - } - - UebTopicSink uebTopicWriter = this.build(UebPropertyUtils.makeBuilder(props, topic, servers) - .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null)) - .build()); - newUebTopicSinks.add(uebTopicWriter); - } - - @Override - public void destroy(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - UebTopicSink uebTopicWriter; - synchronized (this) { - if (!uebTopicSinks.containsKey(topic)) { - return; - } - - uebTopicWriter = uebTopicSinks.remove(topic); - } - - uebTopicWriter.shutdown(); - } - - @Override - public void destroy() { - List<UebTopicSink> writers = this.inventory(); - for (UebTopicSink writer : writers) { - writer.shutdown(); - } - - synchronized (this) { - this.uebTopicSinks.clear(); - } - } - - @Override - public UebTopicSink get(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (uebTopicSinks.containsKey(topic)) { - return uebTopicSinks.get(topic); - } else { - throw new IllegalStateException("UebTopicSink for " + topic + " not found"); - } - } - } - - @Override - public synchronized List<UebTopicSink> inventory() { - return new ArrayList<>(this.uebTopicSinks.values()); - } - - /** - * Makes a new sink. - * - * @param busTopicParams parameters to use to configure the sink - * @return a new sink - */ - protected UebTopicSink makeSink(BusTopicParams busTopicParams) { - return new InlineUebTopicSink(busTopicParams); - } - - - @Override - public String toString() { - return "IndexedUebTopicSinkFactory " + uebTopicSinks.keySet(); - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java deleted file mode 100644 index 09500978..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import com.google.re2j.Pattern; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; -import org.apache.commons.lang3.StringUtils; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; -import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.onap.policy.common.endpoints.utils.PropertyUtils; -import org.onap.policy.common.endpoints.utils.UebPropertyUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Factory of UEB Source Topics indexed by topic name. - */ -class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { - private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*"); - private static final String MISSING_TOPIC = "A topic must be provided"; - - /** - * Logger. - */ - private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class); - - /** - * UEB Topic Name Index. - */ - protected HashMap<String, UebTopicSource> uebTopicSources = new HashMap<>(); - - @Override - public UebTopicSource build(BusTopicParams busTopicParams) { - if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) { - throw new IllegalArgumentException("UEB Server(s) must be provided"); - } - - if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (uebTopicSources.containsKey(busTopicParams.getTopic())) { - return uebTopicSources.get(busTopicParams.getTopic()); - } - - var uebTopicSource = makeSource(busTopicParams); - - if (busTopicParams.isManaged()) { - uebTopicSources.put(busTopicParams.getTopic(), uebTopicSource); - } - - return uebTopicSource; - } - } - - @Override - public List<UebTopicSource> build(Properties properties) { - - String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS); - if (StringUtils.isBlank(readTopics)) { - logger.info("{}: no topic for UEB Source", this); - return new ArrayList<>(); - } - - List<UebTopicSource> newUebTopicSources = new ArrayList<>(); - synchronized (this) { - for (String topic : COMMA_SPACE_PAT.split(readTopics)) { - addTopic(newUebTopicSources, topic, properties); - } - } - return newUebTopicSources; - } - - @Override - public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) { - - return this.build(BusTopicParams.builder() - .servers(servers) - .topic(topic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH) - .fetchLimit(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH) - .managed(true) - .useHttps(false) - .allowSelfSignedCerts(true).build()); - } - - @Override - public UebTopicSource build(List<String> servers, String topic) { - return this.build(servers, topic, null, null); - } - - private void addTopic(List<UebTopicSource> newUebTopicSources, String topic, Properties properties) { - if (this.uebTopicSources.containsKey(topic)) { - newUebTopicSources.add(this.uebTopicSources.get(topic)); - return; - } - - String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic; - - var props = new PropertyUtils(properties, topicPrefix, - (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); - - String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - if (StringUtils.isBlank(servers)) { - logger.error("{}: no UEB servers configured for sink {}", this, topic); - return; - } - - var uebTopicSource = this.build(UebPropertyUtils.makeBuilder(props, topic, servers) - .consumerGroup(props.getString( - PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null)) - .consumerInstance(props.getString( - PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null)) - .fetchTimeout(props.getInteger( - PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX, - PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH)) - .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, - PolicyEndPointProperties.DEFAULT_LIMIT_FETCH)) - .build()); - - newUebTopicSources.add(uebTopicSource); - } - - /** - * Makes a new source. - * - * @param busTopicParams parameters to use to configure the source - * @return a new source - */ - protected UebTopicSource makeSource(BusTopicParams busTopicParams) { - return new SingleThreadedUebTopicSource(busTopicParams); - } - - @Override - public void destroy(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - UebTopicSource uebTopicSource; - - synchronized (this) { - if (!uebTopicSources.containsKey(topic)) { - return; - } - - uebTopicSource = uebTopicSources.remove(topic); - } - - uebTopicSource.shutdown(); - } - - @Override - public void destroy() { - List<UebTopicSource> readers = this.inventory(); - for (UebTopicSource reader : readers) { - reader.shutdown(); - } - - synchronized (this) { - this.uebTopicSources.clear(); - } - } - - @Override - public UebTopicSource get(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (uebTopicSources.containsKey(topic)) { - return uebTopicSources.get(topic); - } else { - throw new IllegalStateException("UebTopiceSource for " + topic + " not found"); - } - } - } - - @Override - public synchronized List<UebTopicSource> inventory() { - return new ArrayList<>(this.uebTopicSources.values()); - } - - @Override - public String toString() { - return "IndexedUebTopicSourceFactory " + uebTopicSources.keySet(); - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java deleted file mode 100644 index 721f2135..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import lombok.AccessLevel; -import lombok.Getter; -import lombok.NoArgsConstructor; - -@NoArgsConstructor(access = AccessLevel.PRIVATE) -public class UebTopicFactories { - - /** - * Factory for instantiation and management of sinks. - */ - @Getter - private static final UebTopicSinkFactory sinkFactory = new IndexedUebTopicSinkFactory(); - - /** - * Factory for instantiation and management of sources. - */ - @Getter - private static final UebTopicSourceFactory sourceFactory = new IndexedUebTopicSourceFactory(); -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java deleted file mode 100644 index acfef6da..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java +++ /dev/null @@ -1,28 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -/** - * Topic Writer over UEB Infrastructure. - */ -public interface UebTopicSink extends BusTopicSink { - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java deleted file mode 100644 index 0cf095fd..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import java.util.List; -import java.util.Properties; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; - -/** - * UEB Topic Sink Factory. - */ -public interface UebTopicSinkFactory { - - /** - * Instantiates a new UEB Topic Writer. - * - * @param busTopicParams parameters object - * @return an UEB Topic Sink - */ - UebTopicSink build(BusTopicParams busTopicParams); - - /** - * Creates an UEB Topic Writer based on properties files. - * - * @param properties Properties containing initialization values - * - * @return an UEB Topic Writer - * @throws IllegalArgumentException if invalid parameters are present - */ - List<UebTopicSink> build(Properties properties); - - /** - * Instantiates a new UEB Topic Writer. - * - * @param servers list of servers - * @param topic topic name - * - * @return an UEB Topic Writer - * @throws IllegalArgumentException if invalid parameters are present - */ - UebTopicSink build(List<String> servers, String topic); - - /** - * Destroys an UEB Topic Writer based on a topic. - * - * @param topic topic name - * @throws IllegalArgumentException if invalid parameters are present - */ - void destroy(String topic); - - /** - * Destroys all UEB Topic Writers. - */ - void destroy(); - - /** - * gets an UEB Topic Writer based on topic name. - * - * @param topic the topic name - * - * @return an UEB Topic Writer with topic name - * @throws IllegalArgumentException if an invalid topic is provided - * @throws IllegalStateException if the UEB Topic Reader is an incorrect state - */ - UebTopicSink get(String topic); - - /** - * Provides a snapshot of the UEB Topic Writers. - * - * @return a list of the UEB Topic Writers - */ - List<UebTopicSink> inventory(); -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java deleted file mode 100644 index 56534309..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java +++ /dev/null @@ -1,29 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -/** - * Topic Source for UEB Communication Infrastructure. - * - */ -public interface UebTopicSource extends BusTopicSource { - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java deleted file mode 100644 index beacee3b..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import java.util.List; -import java.util.Properties; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; - -/** - * UEB Topic Source Factory. - */ -public interface UebTopicSourceFactory { - - /** - * Creates an UEB Topic Source based on properties files. - * - * @param properties Properties containing initialization values - * - * @return an UEB Topic Source - * @throws IllegalArgumentException if invalid parameters are present - */ - List<UebTopicSource> build(Properties properties); - - /** - * Instantiates a new UEB Topic Source. - * - * @param busTopicParams parameters object - * @return an UEB Topic Source - */ - UebTopicSource build(BusTopicParams busTopicParams); - - /** - * Instantiates a new UEB Topic Source. - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * - * @return an UEB Topic Source - * @throws IllegalArgumentException if invalid parameters are present - */ - UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret); - - /** - * Instantiates a new UEB Topic Source. - * - * @param servers list of servers - * @param topic topic name - * - * @return an UEB Topic Source - * @throws IllegalArgumentException if invalid parameters are present - */ - UebTopicSource build(List<String> servers, String topic); - - /** - * Destroys an UEB Topic Source based on a topic. - * - * @param topic topic name - * @throws IllegalArgumentException if invalid parameters are present - */ - void destroy(String topic); - - /** - * Destroys all UEB Topic Sources. - */ - void destroy(); - - /** - * Gets an UEB Topic Source based on topic name. - * - * @param topic the topic name - * @return an UEB Topic Source with topic name - * @throws IllegalArgumentException if an invalid topic is provided - * @throws IllegalStateException if the UEB Topic Source is an incorrect state - */ - UebTopicSource get(String topic); - - /** - * Provides a snapshot of the UEB Topic Sources. - * - * @return a list of the UEB Topic Sources - */ - List<UebTopicSource> inventory(); -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 3c57d1ba..b46c2715 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -23,9 +23,6 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -import com.att.nsa.cambria.client.CambriaConsumer; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.TraceFlags; @@ -33,9 +30,7 @@ import io.opentelemetry.api.trace.TraceState; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor; import java.io.IOException; -import java.net.MalformedURLException; import java.nio.charset.StandardCharsets; -import java.security.GeneralSecurityException; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -138,98 +133,6 @@ public interface BusConsumer { } /** - * Cambria based consumer. - */ - public static class CambriaConsumerWrapper extends FetchingBusConsumer { - - /** - * logger. - */ - private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class); - - /** - * Used to build the consumer. - */ - private final ConsumerBuilder builder; - - /** - * Cambria client. - */ - private final CambriaConsumer consumer; - - /** - * Cambria Consumer Wrapper. - * BusTopicParam object contains the following parameters - * servers - messaging bus hosts. - * topic - topic for messages - * apiKey - API Key - * apiSecret - API Secret - * consumerGroup - Consumer Group - * consumerInstance - Consumer Instance - * fetchTimeout - Fetch Timeout - * fetchLimit - Fetch Limit - * - * @param busTopicParams - The parameters for the bus topic - */ - public CambriaConsumerWrapper(BusTopicParams busTopicParams) { - super(busTopicParams); - - this.builder = new CambriaClientBuilders.ConsumerBuilder(); - - builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance()) - .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic()) - .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit()); - - // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable) - builder.withSocketTimeout(fetchTimeout + 30000); - - if (busTopicParams.isUseHttps()) { - builder.usingHttps(); - - if (busTopicParams.isAllowSelfSignedCerts()) { - builder.allowSelfSignedCertificates(); - } - } - - if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) { - builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret()); - } - - if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) { - builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword()); - } - - try { - this.consumer = builder.build(); - } catch (MalformedURLException | GeneralSecurityException e) { - throw new IllegalArgumentException(e); - } - } - - @Override - public Iterable<String> fetch() throws IOException { - try { - return this.consumer.fetch(); - } catch (final IOException e) { //NOSONAR - logger.error("{}: cannot fetch because of {}", this, e.getMessage()); - sleepAfterFetchFailure(); - throw e; - } - } - - @Override - public void close() { - super.close(); - this.consumer.close(); - } - - @Override - public String toString() { - return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]"; - } - } - - /** * Kafka based consumer. */ class KafkaConsumerWrapper extends FetchingBusConsumer { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index e2adde0d..1b57e48e 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -23,19 +23,13 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; -import com.att.nsa.apiClient.http.HttpClient.ConnectionType; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaClientBuilders; import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; import java.util.Properties; import java.util.UUID; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.onap.policy.common.gson.annotation.GsonJsonIgnore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,89 +55,6 @@ public interface BusPublisher { void close(); /** - * Cambria based library publisher. - */ - class CambriaPublisherWrapper implements BusPublisher { - - private static final Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class); - - /** - * The actual Cambria publisher. - */ - @GsonJsonIgnore - protected CambriaBatchingPublisher publisher; - - /** - * Constructor. - * - * @param busTopicParams topic parameters - */ - public CambriaPublisherWrapper(BusTopicParams busTopicParams) { - - var builder = new CambriaClientBuilders.PublisherBuilder(); - - builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic()); - - // Set read timeout to 30 seconds (TBD: this should be configurable) - builder.withSocketTimeout(30000); - - if (busTopicParams.isUseHttps()) { - if (busTopicParams.isAllowSelfSignedCerts()) { - builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION); - } else { - builder.withConnectionType(ConnectionType.HTTPS); - } - } - - if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) { - builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret()); - } - - if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) { - builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword()); - } - - try { - this.publisher = builder.build(); - } catch (MalformedURLException | GeneralSecurityException e) { - throw new IllegalArgumentException(e); - } - } - - @Override - public boolean send(String partitionId, String message) { - if (message == null) { - throw new IllegalArgumentException(NO_MESSAGE_PROVIDED); - } - - try { - this.publisher.send(partitionId, message); - } catch (Exception e) { - logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e); - return false; - } - return true; - } - - @Override - public void close() { - logger.info(LOG_CLOSE, this); - - try { - this.publisher.close(); - } catch (Exception e) { - logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e); - } - } - - @Override - public String toString() { - return "CambriaPublisherWrapper []"; - } - - } - - /** * Kafka based library publisher. */ class KafkaPublisherWrapper implements BusPublisher { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java deleted file mode 100644 index 896cb3bb..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved. - * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal; - -import org.onap.policy.common.endpoints.event.comm.Topic; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This implementation publishes events for the associated UEB topic, inline with the calling - * thread. - */ -public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSink { - - /** - * Logger. - */ - private static Logger logger = LoggerFactory.getLogger(InlineUebTopicSink.class); - - /** - * Argument-based UEB Topic Writer instantiation. BusTopicParams contains below mentioned - * attributes. - * - * <p>servers list of UEB servers available for publishing - * topic the topic to publish to - * apiKey the api key (optional) - * apiSecret the api secret (optional) - * partitionId the partition key (optional, autogenerated if not provided) - * useHttps does connection use HTTPS? - * allowTracing is tracing allowed? - * allowSelfSignedCerts are self-signed certificates allow - * @param busTopicParams contains attributes needed - * @throws IllegalArgumentException if invalid arguments are detected - */ - public InlineUebTopicSink(BusTopicParams busTopicParams) { - super(busTopicParams); - } - - /** - * Instantiation of internal resources. - */ - @Override - public void init() { - - this.publisher = new BusPublisher.CambriaPublisherWrapper(BusTopicParams.builder() - .servers(this.servers) - .topic(this.effectiveTopic) - .apiKey(this.apiKey) - .apiSecret(this.apiSecret) - .useHttps(this.useHttps) - .allowTracing(this.allowTracing) - .allowSelfSignedCerts(this.allowSelfSignedCerts) - .build()); - logger.info("{}: UEB SINK created", this); - } - - @Override - public String toString() { - return "InlineUebTopicSink [getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()=" - + super.toString() + "]"; - } - - @Override - public CommInfrastructure getTopicCommInfrastructure() { - return Topic.CommInfrastructure.UEB; - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java deleted file mode 100644 index ead04594..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal; - -import org.onap.policy.common.endpoints.event.comm.Topic; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; - -/** - * This topic source implementation specializes in reading messages over an UEB Bus topic source and - * notifying its listeners. - */ -public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource implements UebTopicSource { - - /** - * Constructor. - * - * @param busTopicParams Parameters object containing all the required inputs - * @throws IllegalArgumentException An invalid parameter passed in - */ - public SingleThreadedUebTopicSource(BusTopicParams busTopicParams) { - super(busTopicParams); - this.init(); - } - - /** - * Initialize the Cambria client. - */ - @Override - public void init() { - this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder() - .servers(this.servers) - .topic(this.effectiveTopic) - .apiKey(this.apiKey) - .apiSecret(this.apiSecret) - .consumerGroup(this.consumerGroup) - .consumerInstance(this.consumerInstance) - .fetchTimeout(this.fetchTimeout) - .fetchLimit(this.fetchLimit) - .useHttps(this.useHttps) - .allowTracing(this.allowTracing) - .allowSelfSignedCerts(this.allowSelfSignedCerts).build()); - } - - @Override - public CommInfrastructure getTopicCommInfrastructure() { - return Topic.CommInfrastructure.UEB; - } - - @Override - public String toString() { - return "SingleThreadedUebTopicSource [getTopicCommInfrastructure()=" + getTopicCommInfrastructure() - + ", toString()=" + super.toString() + "]"; - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java index 5f49ea34..0ccc8a75 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java @@ -3,7 +3,7 @@ * ONAP PAP * ================================================================================ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019, 2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; /** * Client for sending messages to a Topic using TopicSink. */ +@Getter public class TopicSinkClient { private static final Logger logger = LoggerFactory.getLogger(TopicSinkClient.class); @@ -46,7 +47,6 @@ public class TopicSinkClient { /** * Where messages are published. */ - @Getter private final TopicSink sink; /** diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.java index 92dd6483..b6777db7 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.java @@ -37,8 +37,6 @@ import org.junit.Test; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicPropertyBuilder; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicFactories; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicPropertyBuilder; import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; import org.onap.policy.common.endpoints.parameters.TopicParameters; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; @@ -49,11 +47,8 @@ public class TopicEndpointProxyTest { private static final String NOOP_SOURCE_TOPIC = "noop-source"; private static final String NOOP_SINK_TOPIC = "noop-sink"; - private static final String UEB_SOURCE_TOPIC = "ueb-source"; - private static final String UEB_SINK_TOPIC = "ueb-sink"; - - private Properties configuration = new Properties(); - private TopicParameterGroup group = new TopicParameterGroup(); + private final Properties configuration = new Properties(); + private final TopicParameterGroup group = new TopicParameterGroup(); /** * Constructor. @@ -74,18 +69,6 @@ public class TopicEndpointProxyTest { configuration.putAll(noopSinkBuilder.build()); group.getTopicSinks().add(noopSinkBuilder.getParams()); - UebTopicPropertyBuilder uebSourceBuilder = - new UebTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS) - .makeTopic(UEB_SOURCE_TOPIC); - configuration.putAll(uebSourceBuilder.build()); - group.getTopicSources().add(uebSourceBuilder.getParams()); - - UebTopicPropertyBuilder uebSinkBuilder = - new UebTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS) - .makeTopic(UEB_SINK_TOPIC); - configuration.putAll(uebSinkBuilder.build()); - group.getTopicSinks().add(uebSinkBuilder.getParams()); - TopicParameters invalidCommInfraParams = new NoopTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS) .makeTopic(NOOP_SOURCE_TOPIC).getParams(); @@ -99,19 +82,19 @@ public class TopicEndpointProxyTest { } private <T extends Topic> boolean allSources(List<T> topics) { - return exists(topics, NOOP_SOURCE_TOPIC) && exists(topics, UEB_SOURCE_TOPIC); + return exists(topics, NOOP_SOURCE_TOPIC); } private <T extends Topic> boolean allSinks(List<T> topics) { - return exists(topics, NOOP_SINK_TOPIC) && exists(topics, UEB_SINK_TOPIC); + return exists(topics, NOOP_SINK_TOPIC); } private <T extends Topic> boolean anySource(List<T> topics) { - return exists(topics, NOOP_SOURCE_TOPIC) || exists(topics, UEB_SOURCE_TOPIC); + return exists(topics, NOOP_SOURCE_TOPIC); } private <T extends Topic> boolean anySink(List<T> topics) { - return exists(topics, NOOP_SINK_TOPIC) || exists(topics, UEB_SINK_TOPIC); + return exists(topics, NOOP_SINK_TOPIC); } /** @@ -121,9 +104,6 @@ public class TopicEndpointProxyTest { public void tearDown() { NoopTopicFactories.getSinkFactory().destroy(); NoopTopicFactories.getSourceFactory().destroy(); - - UebTopicFactories.getSinkFactory().destroy(); - UebTopicFactories.getSourceFactory().destroy(); } @Test @@ -142,7 +122,7 @@ public class TopicEndpointProxyTest { TopicEndpoint manager = new TopicEndpointProxy(); List<TopicSource> sources = manager.addTopicSources(group.getTopicSources()); - assertSame(2, sources.size()); + assertSame(1, sources.size()); assertTrue(allSources(sources)); assertFalse(anySink(sources)); @@ -153,7 +133,7 @@ public class TopicEndpointProxyTest { TopicEndpoint manager = new TopicEndpointProxy(); List<TopicSource> sources = manager.addTopicSources(configuration); - assertSame(2, sources.size()); + assertSame(1, sources.size()); assertTrue(allSources(sources)); assertFalse(anySink(sources)); @@ -164,7 +144,7 @@ public class TopicEndpointProxyTest { TopicEndpoint manager = new TopicEndpointProxy(); List<TopicSink> sinks = manager.addTopicSinks(group.getTopicSinks()); - assertSame(2, sinks.size()); + assertSame(1, sinks.size()); assertFalse(anySource(sinks)); assertTrue(allSinks(sinks)); @@ -175,7 +155,7 @@ public class TopicEndpointProxyTest { TopicEndpoint manager = new TopicEndpointProxy(); List<TopicSink> sinks = manager.addTopicSinks(configuration); - assertSame(2, sinks.size()); + assertSame(1, sinks.size()); assertFalse(anySource(sinks)); assertTrue(allSinks(sinks)); @@ -186,7 +166,7 @@ public class TopicEndpointProxyTest { TopicEndpoint manager = new TopicEndpointProxy(); List<Topic> topics = manager.addTopics(configuration); - assertSame(4, topics.size()); + assertSame(2, topics.size()); assertTrue(allSources(topics)); assertTrue(allSinks(topics)); @@ -197,7 +177,7 @@ public class TopicEndpointProxyTest { TopicEndpoint manager = new TopicEndpointProxy(); List<Topic> topics = manager.addTopics(group); - assertSame(4, topics.size()); + assertSame(2, topics.size()); assertTrue(allSources(topics)); assertTrue(allSinks(topics)); @@ -236,7 +216,7 @@ public class TopicEndpointProxyTest { manager.addTopicSinks(configuration); List<TopicSource> sources = manager.getTopicSources(); - assertSame(2, sources.size()); + assertSame(1, sources.size()); assertTrue(allSources(sources)); assertFalse(anySink(sources)); @@ -250,21 +230,13 @@ public class TopicEndpointProxyTest { manager.addTopicSinks(configuration); List<TopicSink> sinks = manager.getTopicSinks(); - assertSame(2, sinks.size()); + assertSame(1, sinks.size()); assertFalse(anySource(sinks)); assertTrue(allSinks(sinks)); } @Test - public void testGetUebTopicSources() { - TopicEndpoint manager = new TopicEndpointProxy(); - - manager.addTopicSources(configuration); - assertSame(1, manager.getUebTopicSources().size()); - } - - @Test public void testGetNoopTopicSources() { TopicEndpoint manager = new TopicEndpointProxy(); @@ -273,14 +245,6 @@ public class TopicEndpointProxyTest { } @Test - public void testGetUebTopicSinks() { - TopicEndpoint manager = new TopicEndpointProxy(); - - manager.addTopicSinks(configuration); - assertSame(1, manager.getUebTopicSinks().size()); - } - - @Test public void testGetNoopTopicSinks() { TopicEndpoint manager = new TopicEndpointProxy(); @@ -322,12 +286,9 @@ public class TopicEndpointProxyTest { manager.addTopicSources(configuration); assertSame(NOOP_SOURCE_TOPIC, manager.getTopicSource(CommInfrastructure.NOOP, NOOP_SOURCE_TOPIC).getTopic()); - assertSame(UEB_SOURCE_TOPIC, manager.getTopicSource(CommInfrastructure.UEB, UEB_SOURCE_TOPIC).getTopic()); assertThatIllegalStateException() .isThrownBy(() -> manager.getTopicSource(CommInfrastructure.NOOP, NOOP_SINK_TOPIC)); - assertThatIllegalStateException() - .isThrownBy(() -> manager.getTopicSource(CommInfrastructure.UEB, UEB_SINK_TOPIC)); } @Test @@ -336,38 +297,9 @@ public class TopicEndpointProxyTest { manager.addTopicSinks(configuration); assertSame(NOOP_SINK_TOPIC, manager.getTopicSink(CommInfrastructure.NOOP, NOOP_SINK_TOPIC).getTopic()); - assertSame(UEB_SINK_TOPIC, manager.getTopicSink(CommInfrastructure.UEB, UEB_SINK_TOPIC).getTopic()); assertThatIllegalStateException() .isThrownBy(() -> manager.getTopicSink(CommInfrastructure.NOOP, NOOP_SOURCE_TOPIC)); - assertThatIllegalStateException() - .isThrownBy(() -> manager.getTopicSink(CommInfrastructure.UEB, UEB_SOURCE_TOPIC)); - } - - @Test - public void testGetUebTopicSource() { - TopicEndpoint manager = new TopicEndpointProxy(); - manager.addTopicSources(configuration); - - assertSame(UEB_SOURCE_TOPIC, manager.getUebTopicSource(UEB_SOURCE_TOPIC).getTopic()); - - assertThatIllegalStateException().isThrownBy(() -> manager.getUebTopicSource(NOOP_SOURCE_TOPIC)); - - assertThatIllegalArgumentException().isThrownBy(() -> manager.getUebTopicSource(null)); - assertThatIllegalArgumentException().isThrownBy(() -> manager.getUebTopicSource("")); - } - - @Test - public void testGetUebTopicSink() { - TopicEndpoint manager = new TopicEndpointProxy(); - manager.addTopicSinks(configuration); - - assertSame(UEB_SINK_TOPIC, manager.getUebTopicSink(UEB_SINK_TOPIC).getTopic()); - - assertThatIllegalStateException().isThrownBy(() -> manager.getUebTopicSink(NOOP_SINK_TOPIC)); - - assertThatIllegalArgumentException().isThrownBy(() -> manager.getUebTopicSink(null)); - assertThatIllegalArgumentException().isThrownBy(() -> manager.getUebTopicSink("")); } @Test @@ -377,8 +309,6 @@ public class TopicEndpointProxyTest { assertSame(NOOP_SOURCE_TOPIC, manager.getNoopTopicSource(NOOP_SOURCE_TOPIC).getTopic()); - assertThatIllegalStateException().isThrownBy(() -> manager.getNoopTopicSource(UEB_SOURCE_TOPIC)); - assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSource(null)); assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSource("")); } @@ -390,8 +320,6 @@ public class TopicEndpointProxyTest { assertSame(NOOP_SINK_TOPIC, manager.getNoopTopicSink(NOOP_SINK_TOPIC).getTopic()); - assertThatIllegalStateException().isThrownBy(() -> manager.getNoopTopicSink(UEB_SINK_TOPIC)); - assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSink(null)); assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSink("")); } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactoryTestBase.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactoryTestBase.java index 3986549c..3dfd96dd 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactoryTestBase.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactoryTestBase.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java index c109e70a..52868c44 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2022 Nordix Foundation. + * Copyright (C) 2022, 2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,11 +21,11 @@ package org.onap.policy.common.endpoints.event.comm.bus; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS; import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX; -import java.util.Arrays; import java.util.Deque; import java.util.LinkedList; import java.util.List; @@ -70,7 +70,7 @@ public class KafkaTopicSinkFactoryTest extends KafkaTopicFactoryTestBase<KafkaTo // check parameters that were used BusTopicParams params = getLastParams(); - assertEquals(false, params.isAllowSelfSignedCerts()); + assertFalse(params.isAllowSelfSignedCerts()); } @Test @@ -82,9 +82,9 @@ public class KafkaTopicSinkFactoryTest extends KafkaTopicFactoryTestBase<KafkaTo assertEquals(MY_EFFECTIVE_TOPIC, topics.get(0).getEffectiveTopic()); BusTopicParams params = getLastParams(); - assertEquals(true, params.isManaged()); - assertEquals(false, params.isUseHttps()); - assertEquals(Arrays.asList(KAFKA_SERVER), params.getServers()); + assertTrue(params.isManaged()); + assertFalse(params.isUseHttps()); + assertEquals(List.of(KAFKA_SERVER), params.getServers()); assertEquals(MY_TOPIC, params.getTopic()); assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic()); assertEquals(MY_PARTITION, params.getPartitionId()); @@ -178,7 +178,7 @@ public class KafkaTopicSinkFactoryTest extends KafkaTopicFactoryTestBase<KafkaTo } /** - * Factory that records the parameters of all of the sinks it creates. + * Factory that records the parameters of all the sinks it creates. */ private static class SinkFactory extends IndexedKafkaTopicSinkFactory { private Deque<BusTopicParams> params = new LinkedList<>(); diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkTest.java index 503e5131..483e4e99 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP Policy Engine - Common Modules * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2022, 2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java index 1c6985ad..392cefe9 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java @@ -2,8 +2,7 @@ * ============LICENSE_START======================================================= * ONAP Policy Engine - Common Modules * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2023 Nordix Foundation. + * Copyright (C) 2022-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +21,7 @@ package org.onap.policy.common.endpoints.event.comm.bus; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS; @@ -69,9 +69,9 @@ public class KafkaTopicSourceFactoryTest extends KafkaTopicFactoryTestBase<Kafka assertEquals(MY_EFFECTIVE_TOPIC, topics.get(0).getEffectiveTopic()); BusTopicParams params = getLastParams(); - assertEquals(true, params.isManaged()); - assertEquals(false, params.isUseHttps()); - assertEquals(Arrays.asList(KAFKA_SERVER), params.getServers()); + assertTrue(params.isManaged()); + assertFalse(params.isUseHttps()); + assertEquals(List.of(KAFKA_SERVER), params.getServers()); assertEquals(MY_TOPIC, params.getTopic()); assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic()); } @@ -154,7 +154,7 @@ public class KafkaTopicSourceFactoryTest extends KafkaTopicFactoryTestBase<Kafka } /** - * Factory that records the parameters of all of the sources it creates. + * Factory that records the parameters of all the sources it creates. */ private static class SourceFactory extends IndexedKafkaTopicSourceFactory { private Deque<BusTopicParams> params = new LinkedList<>(); diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceTest.java index ee2d1d7b..5079e601 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP Policy Engine - Common Modules * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactoryTestBase.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactoryTestBase.java deleted file mode 100644 index 41dbac8c..00000000 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactoryTestBase.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; - -import java.util.Collections; -import org.onap.policy.common.endpoints.event.comm.Topic; - -/** - * Base class for UebTopicXxxFactory tests. - * - * @param <T> type of topic managed by the factory - */ -public abstract class UebTopicFactoryTestBase<T extends Topic> extends BusTopicFactoryTestBase<T> { - - @Override - public void testBuildBusTopicParams_Ex() { - - super.testBuildBusTopicParams_Ex(); - - // null servers - assertThatIllegalArgumentException().as("null servers") - .isThrownBy(() -> buildTopic(makeBuilder().servers(null).build())); - - // empty servers - assertThatIllegalArgumentException().as("empty servers") - .isThrownBy(() -> buildTopic(makeBuilder().servers(Collections.emptyList()).build())); - } -} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicPropertyBuilder.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicPropertyBuilder.java deleted file mode 100644 index 42ea6eba..00000000 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicPropertyBuilder.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2024 Nordix Foundation. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_AFT_ENV; -import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_API_KEY; -import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_API_SECRET; -import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_CONS_GROUP; -import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_CONS_INST; -import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_EFFECTIVE_TOPIC; -import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_FETCH_LIMIT; -import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_FETCH_TIMEOUT; -import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_PARTITION; -import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX; -import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX; -import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX; -import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX; -import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX; -import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX; -import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX; -import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX; -import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX; -import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX; -import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX; -import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX; - -import java.util.List; -import lombok.Getter; -import org.onap.policy.common.endpoints.parameters.TopicParameters; - -@Getter -public class UebTopicPropertyBuilder extends TopicPropertyBuilder { - - public static final String SERVER = "my-server"; - public static final String TOPIC2 = "my-topic-2"; - - private final TopicParameters params = new TopicParameters(); - - /** - * Constructs the object. - * - * @param prefix the prefix for the properties to be built - */ - public UebTopicPropertyBuilder(String prefix) { - super(prefix); - } - - /** - * Adds a topic and configures its properties with default values. - * - * @param topic the topic to be added - * @return this builder - */ - public UebTopicPropertyBuilder makeTopic(String topic) { - addTopic(topic); - - setTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, MY_EFFECTIVE_TOPIC); - setTopicProperty(PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, MY_CONS_GROUP); - setTopicProperty(PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, MY_CONS_INST); - setTopicProperty(PROPERTY_MANAGED_SUFFIX, "true"); - setTopicProperty(PROPERTY_HTTP_HTTPS_SUFFIX, "true"); - setTopicProperty(PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX, "true"); - setTopicProperty(PROPERTY_TOPIC_API_KEY_SUFFIX, MY_API_KEY); - setTopicProperty(PROPERTY_TOPIC_API_SECRET_SUFFIX, MY_API_SECRET); - setTopicProperty(PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, MY_FETCH_LIMIT); - setTopicProperty(PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX, MY_FETCH_TIMEOUT); - setTopicProperty(PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, MY_PARTITION); - setTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER); - - params.setTopicCommInfrastructure("ueb"); - params.setTopic(topic); - params.setEffectiveTopic(MY_EFFECTIVE_TOPIC); - params.setConsumerGroup(MY_CONS_GROUP); - params.setConsumerInstance(MY_CONS_INST); - params.setManaged(true); - params.setUseHttps(true); - params.setAftEnvironment(MY_AFT_ENV); - params.setAllowSelfSignedCerts(true); - params.setApiKey(MY_API_KEY); - params.setApiSecret(MY_API_SECRET); - params.setFetchLimit(MY_FETCH_LIMIT); - params.setFetchTimeout(MY_FETCH_TIMEOUT); - params.setPartitionId(MY_PARTITION); - params.setServers(List.of(SERVER)); - - return this; - } -} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactoryTest.java deleted file mode 100644 index 4896a9df..00000000 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactoryTest.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS; - -import java.util.Deque; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; - -public class UebTopicSinkFactoryTest extends UebTopicFactoryTestBase<UebTopicSink> { - - private SinkFactory factory; - - /** - * Creates the object to be tested. - */ - @Before - @Override - public void setUp() { - super.setUp(); - - factory = new SinkFactory(); - } - - @After - public void tearDown() { - factory.destroy(); - } - - @Test - @Override - public void testBuildBusTopicParams() { - super.testBuildBusTopicParams(); - super.testBuildBusTopicParams_Ex(); - } - - @Test - @Override - public void testBuildListOfStringString() { - super.testBuildListOfStringString(); - - // check parameters that were used - BusTopicParams params = getLastParams(); - assertEquals(false, params.isAllowSelfSignedCerts()); - } - - @Test - @Override - public void testBuildProperties() { - super.testBuildProperties(); - super.testBuildProperties_Variations(); - super.testBuildProperties_Multiple(); - - initFactory(); - - assertEquals(1, buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build()).size()); - - BusTopicParams params = getLastParams(); - assertEquals(MY_PARTITION, params.getPartitionId()); - } - - @Test - @Override - public void testDestroyString_testGet_testInventory() { - super.testDestroyString_testGet_testInventory(); - super.testDestroyString_Ex(); - } - - @Test - @Override - public void testDestroy() { - super.testDestroy(); - } - - @Test - public void testGet() { - super.testGet_Ex(); - } - - @Test - public void testToString() { - assertTrue(factory.toString().startsWith("IndexedUebTopicSinkFactory [")); - } - - @Override - protected void initFactory() { - if (factory != null) { - factory.destroy(); - } - - factory = new SinkFactory(); - } - - @Override - protected List<UebTopicSink> buildTopics(Properties properties) { - return factory.build(properties); - } - - @Override - protected UebTopicSink buildTopic(BusTopicParams params) { - return factory.build(params); - } - - @Override - protected UebTopicSink buildTopic(List<String> servers, String topic) { - return factory.build(servers, topic); - } - - @Override - protected void destroyFactory() { - factory.destroy(); - } - - @Override - protected void destroyTopic(String topic) { - factory.destroy(topic); - } - - @Override - protected List<UebTopicSink> getInventory() { - return factory.inventory(); - } - - @Override - protected UebTopicSink getTopic(String topic) { - return factory.get(topic); - } - - @Override - protected BusTopicParams getLastParams() { - return factory.params.getLast(); - } - - @Override - protected TopicPropertyBuilder makePropBuilder() { - return new UebTopicPropertyBuilder(PROPERTY_UEB_SINK_TOPICS); - } - - /** - * Factory that records the parameters of all of the sinks it creates. - */ - private static class SinkFactory extends IndexedUebTopicSinkFactory { - private Deque<BusTopicParams> params = new LinkedList<>(); - - @Override - protected UebTopicSink makeSink(BusTopicParams busTopicParams) { - params.add(busTopicParams); - return super.makeSink(busTopicParams); - } - } -} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkTest.java deleted file mode 100644 index 77452604..00000000 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkTest.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP Policy Engine - Common Modules - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import static org.junit.Assert.assertNotNull; - -import org.junit.Test; - -public class UebTopicSinkTest { - - @Test - public void test() { - assertNotNull(UebTopicFactories.getSinkFactory()); - } - -} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactoryTest.java deleted file mode 100644 index 81e30756..00000000 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactoryTest.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP Policy Engine - Common Modules - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX; -import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX; -import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS; - -import java.util.Deque; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; - -public class UebTopicSourceFactoryTest extends UebTopicFactoryTestBase<UebTopicSource> { - - private SourceFactory factory; - - /** - * Creates the object to be tested. - */ - @Before - @Override - public void setUp() { - super.setUp(); - - factory = new SourceFactory(); - } - - @After - public void tearDown() { - factory.destroy(); - } - - @Test - @Override - public void testBuildBusTopicParams() { - super.testBuildBusTopicParams(); - super.testBuildBusTopicParams_Ex(); - } - - @Test - @Override - public void testBuildProperties() { - - super.testBuildProperties(); - - // check source-specific parameters that were used - BusTopicParams params = factory.params.getFirst(); - assertEquals(MY_CONS_GROUP, params.getConsumerGroup()); - assertEquals(MY_CONS_INST, params.getConsumerInstance()); - assertEquals(MY_FETCH_LIMIT, params.getFetchLimit()); - assertEquals(MY_FETCH_TIMEOUT, params.getFetchTimeout()); - - super.testBuildProperties_Variations(); - super.testBuildProperties_Multiple(); - - // check default values for source-specific parameters - checkDefault(PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, - params2 -> params2.getFetchLimit() == PolicyEndPointProperties.DEFAULT_LIMIT_FETCH, - null, "", "invalid-limit-number"); - - checkDefault(PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX, - params2 -> params2.getFetchTimeout() == PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH, - null, "", "invalid-timeout-number"); - } - - @Test - public void testBuildListOfStringStringStringString() { - UebTopicSource source1 = factory.build(servers, MY_TOPIC, MY_API_KEY, MY_API_SECRET); - assertNotNull(source1); - - // check source-specific parameters that were used - BusTopicParams params = factory.params.getFirst(); - assertEquals(MY_API_KEY, params.getApiKey()); - assertEquals(MY_API_SECRET, params.getApiSecret()); - assertEquals(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH, params.getFetchLimit()); - assertEquals(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH, params.getFetchTimeout()); - } - - @Test - @Override - public void testBuildListOfStringString() { - super.testBuildListOfStringString(); - - // check source-specific parameters that were used - BusTopicParams params = factory.params.getFirst(); - assertEquals(null, params.getApiKey()); - assertEquals(null, params.getApiSecret()); - assertEquals(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH, params.getFetchLimit()); - assertEquals(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH, params.getFetchTimeout()); - - assertEquals(true, params.isAllowSelfSignedCerts()); - } - - @Test - @Override - public void testDestroyString_testGet_testInventory() { - super.testDestroyString_testGet_testInventory(); - super.testDestroyString_Ex(); - } - - @Test - @Override - public void testDestroy() { - super.testDestroy(); - } - - @Test - public void testGet() { - super.testGet_Ex(); - } - - @Test - public void testToString() { - assertTrue(factory.toString().startsWith("IndexedUebTopicSourceFactory [")); - } - - @Override - protected void initFactory() { - if (factory != null) { - factory.destroy(); - } - - factory = new SourceFactory(); - } - - @Override - protected List<UebTopicSource> buildTopics(Properties properties) { - return factory.build(properties); - } - - @Override - protected UebTopicSource buildTopic(BusTopicParams params) { - return factory.build(params); - } - - @Override - protected UebTopicSource buildTopic(List<String> servers, String topic) { - return factory.build(servers, topic); - } - - @Override - protected void destroyFactory() { - factory.destroy(); - } - - @Override - protected void destroyTopic(String topic) { - factory.destroy(topic); - } - - @Override - protected List<UebTopicSource> getInventory() { - return factory.inventory(); - } - - @Override - protected UebTopicSource getTopic(String topic) { - return factory.get(topic); - } - - @Override - protected BusTopicParams getLastParams() { - return factory.params.getLast(); - } - - @Override - protected TopicPropertyBuilder makePropBuilder() { - return new UebTopicPropertyBuilder(PROPERTY_UEB_SOURCE_TOPICS); - } - - /** - * Factory that records the parameters of all of the sources it creates. - */ - private static class SourceFactory extends IndexedUebTopicSourceFactory { - private Deque<BusTopicParams> params = new LinkedList<>(); - - @Override - protected UebTopicSource makeSource(BusTopicParams busTopicParams) { - params.add(busTopicParams); - return super.makeSource(busTopicParams); - } - } -} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceTest.java deleted file mode 100644 index 9ef8af84..00000000 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceTest.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP Policy Engine - Common Modules - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import static org.junit.Assert.assertNotNull; - -import org.junit.Test; - -public class UebTopicSourceTest { - - @Test - public void test() { - assertNotNull(UebTopicFactories.getSourceFactory()); - } - -} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java index 70fa83c6..2c33a257 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java @@ -23,42 +23,35 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; -import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.att.nsa.cambria.client.CambriaConsumer; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; -import org.apache.commons.collections4.IteratorUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.CambriaConsumerWrapper; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.springframework.test.util.ReflectionTestUtils; public class BusConsumerTest extends TopicTestBase { @@ -68,11 +61,18 @@ public class BusConsumerTest extends TopicTestBase { @Mock KafkaConsumer<String, String> mockedKafkaConsumer; + AutoCloseable closeable; + @Before @Override public void setUp() { super.setUp(); - MockitoAnnotations.initMocks(this); + closeable = MockitoAnnotations.openMocks(this); + } + + @After + public void tearDown() throws Exception { + closeable.close(); } @@ -139,61 +139,6 @@ public class BusConsumerTest extends TopicTestBase { } @Test - public void testCambriaConsumerWrapper() { - // verify that different wrappers can be built - new CambriaConsumerWrapper(makeBuilder().build()); - new CambriaConsumerWrapper(makeBuilder().useHttps(false).build()); - new CambriaConsumerWrapper(makeBuilder().useHttps(true).build()); - new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build()); - new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build()); - new CambriaConsumerWrapper(makeBuilder().apiKey(null).build()); - new CambriaConsumerWrapper(makeBuilder().apiSecret(null).build()); - new CambriaConsumerWrapper(makeBuilder().apiKey(null).apiSecret(null).build()); - new CambriaConsumerWrapper(makeBuilder().userName(null).build()); - new CambriaConsumerWrapper(makeBuilder().password(null).build()); - - assertThatCode(() -> new CambriaConsumerWrapper(makeBuilder().userName(null).password(null).build())) - .doesNotThrowAnyException(); - } - - @Test - public void testCambriaConsumerWrapperFetch() throws Exception { - CambriaConsumer inner = mock(CambriaConsumer.class); - List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2); - when(inner.fetch()).thenReturn(lst); - - CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build()); - ReflectionTestUtils.setField(cons, "consumer", inner); - - assertEquals(lst, IteratorUtils.toList(cons.fetch().iterator())); - - // arrange to throw exception next time fetch is called - IOException ex = new IOException(EXPECTED); - when(inner.fetch()).thenThrow(ex); - - cons.fetchTimeout = 10; - - try { - cons.fetch(); - fail("missing exception"); - - } catch (IOException e) { - assertEquals(ex, e); - } - } - - @Test - public void testCambriaConsumerWrapperClose() { - CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build()); - assertThatCode(cons::close).doesNotThrowAnyException(); - } - - @Test - public void testCambriaConsumerWrapperToString() { - assertNotNull(new CambriaConsumerWrapper(makeBuilder().build()).toString()); - } - - @Test public void testKafkaConsumerWrapper() { // verify that different wrappers can be built assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException(); diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java deleted file mode 100644 index 66029b61..00000000 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2024 Nordix Foundation. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal; - -import static org.assertj.core.api.Assertions.assertThatCode; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import java.io.IOException; -import org.junit.Before; -import org.junit.Test; -import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher.CambriaPublisherWrapper; - - -public class BusPublisherTest extends TopicTestBase { - - @Before - @Override - public void setUp() { - super.setUp(); - } - - @Test - public void testCambriaPublisherWrapper() { - // verify that different wrappers can be built - new CambriaPublisherWrapper(makeBuilder().build()); - new CambriaPublisherWrapper(makeBuilder().useHttps(false).build()); - new CambriaPublisherWrapper(makeBuilder().useHttps(true).build()); - new CambriaPublisherWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build()); - new CambriaPublisherWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build()); - new CambriaPublisherWrapper(makeBuilder().apiKey(null).build()); - new CambriaPublisherWrapper(makeBuilder().apiSecret(null).build()); - new CambriaPublisherWrapper(makeBuilder().apiKey(null).apiSecret(null).build()); - new CambriaPublisherWrapper(makeBuilder().userName(null).build()); - new CambriaPublisherWrapper(makeBuilder().password(null).build()); - assertThatCode(() -> new CambriaPublisherWrapper(makeBuilder().userName(null).password(null).build())) - .doesNotThrowAnyException(); - } - - @Test - public void testCambriaPublisherWrapperSend() throws Exception { - CambriaBatchingPublisher pub = mock(CambriaBatchingPublisher.class); - CambriaPublisherWrapper cambria = new CambriaPublisherWrapper(makeBuilder().build()); - cambria.publisher = pub; - - assertTrue(cambria.send(MY_PARTITION, MY_MESSAGE)); - - // publisher exception - when(pub.send(anyString(), anyString())).thenThrow(new IOException(EXPECTED)); - assertFalse(cambria.send(MY_PARTITION2, MY_MESSAGE2)); - } - - @Test(expected = IllegalArgumentException.class) - public void testCambriaPublisherWrapperSend_InvalidMsg() { - CambriaPublisherWrapper cambria = new CambriaPublisherWrapper(makeBuilder().build()); - cambria.publisher = mock(CambriaBatchingPublisher.class); - - cambria.send(MY_PARTITION, null); - } - - @Test - public void testCambriaPublisherWrapperClose() { - CambriaBatchingPublisher pub = mock(CambriaBatchingPublisher.class); - CambriaPublisherWrapper cambria = new CambriaPublisherWrapper(makeBuilder().build()); - cambria.publisher = pub; - - cambria.close(); - verify(pub).close(); - - // try again, this time with an exception - doThrow(new RuntimeException(EXPECTED)).when(pub).close(); - cambria.close(); - } -} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java index 93e5067e..3abb8b10 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,8 +25,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.util.Arrays; import java.util.LinkedList; +import java.util.List; import java.util.function.BiConsumer; import org.junit.Before; import org.junit.Test; @@ -46,7 +47,7 @@ public class BusTopicParamsTest extends TopicTestBase { assertEquals(addProps, params.getAdditionalProps()); assertEquals(MY_AFT_ENV, params.getAftEnvironment()); - assertEquals(true, params.isAllowSelfSignedCerts()); + assertTrue(params.isAllowSelfSignedCerts()); assertEquals(MY_API_KEY, params.getApiKey()); assertEquals(MY_API_SECRET, params.getApiSecret()); assertEquals(MY_BASE_PATH, params.getBasePath()); @@ -59,7 +60,7 @@ public class BusTopicParamsTest extends TopicTestBase { assertEquals(MY_HOST, params.getHostname()); assertEquals(MY_LAT, params.getLatitude()); assertEquals(MY_LONG, params.getLongitude()); - assertEquals(true, params.isManaged()); + assertTrue(params.isManaged()); assertEquals(MY_PARTITION, params.getPartitionId()); assertEquals(MY_PARTNER, params.getPartner()); assertEquals(MY_PASS, params.getPassword()); @@ -67,16 +68,16 @@ public class BusTopicParamsTest extends TopicTestBase { assertEquals(servers, params.getServers()); assertEquals(MY_TOPIC, params.getTopic()); assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic()); - assertEquals(true, params.isUseHttps()); + assertTrue(params.isUseHttps()); assertEquals(MY_USERNAME, params.getUserName()); } @Test public void testBooleanGetters() { // ensure that booleans are independent of each other - testBoolean("true:false:false", (bldr, flag) -> bldr.allowSelfSignedCerts(flag)); - testBoolean("false:true:false", (bldr, flag) -> bldr.managed(flag)); - testBoolean("false:false:true", (bldr, flag) -> bldr.useHttps(flag)); + testBoolean("true:false:false", TopicParamsBuilder::allowSelfSignedCerts); + testBoolean("false:true:false", TopicParamsBuilder::managed); + testBoolean("false:false:true", TopicParamsBuilder::useHttps); } @Test @@ -124,15 +125,15 @@ public class BusTopicParamsTest extends TopicTestBase { assertTrue(makeBuilder().port(65536).build().isPortInvalid()); assertTrue(makeBuilder().servers(null).build().isServersInvalid()); assertTrue(makeBuilder().servers(new LinkedList<>()).build().isServersInvalid()); - assertTrue(makeBuilder().servers(Arrays.asList("")).build().isServersInvalid()); - assertFalse(makeBuilder().servers(Arrays.asList("one-server")).build().isServersInvalid()); + assertTrue(makeBuilder().servers(List.of("")).build().isServersInvalid()); + assertFalse(makeBuilder().servers(List.of("one-server")).build().isServersInvalid()); assertTrue(makeBuilder().topic("").build().isTopicInvalid()); assertFalse(makeBuilder().userName("").build().isUserNameValid()); } /** * Tests the boolean methods by applying a function, once with {@code false} and once - * with {@code true}. Verifies that all of the boolean methods return the correct + * with {@code true}. Verifies that all the boolean methods return the correct * value by concatenating them. * * @param expectedTrue the string that is expected when {@code true} is passed to the @@ -147,7 +148,7 @@ public class BusTopicParamsTest extends TopicTestBase { BusTopicParams params = builder.build(); assertEquals("false:false:false", - "" + params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps()); + params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps()); // now try the "true" case @@ -155,6 +156,6 @@ public class BusTopicParamsTest extends TopicTestBase { params = builder.build(); assertEquals(expectedTrue, - "" + params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps()); + params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps()); } } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java index e6eec799..7aa70b2a 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,6 +33,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Arrays; +import java.util.List; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -124,7 +126,7 @@ public class InlineBusTopicSinkTest extends TopicTestBase { verify(pub).send(MY_PARTITION, MY_MESSAGE); verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE); - assertEquals(Arrays.asList(MY_MESSAGE), Arrays.asList(sink.getRecentEvents())); + assertEquals(List.of(MY_MESSAGE), Arrays.asList(sink.getRecentEvents())); // arrange for send to throw an exception when(pub.send(anyString(), anyString())).thenThrow(new RuntimeException(EXPECTED)); @@ -138,8 +140,7 @@ public class InlineBusTopicSinkTest extends TopicTestBase { @Test(expected = IllegalArgumentException.class) public void testSend_NullMessage() { sink.start(); - BusPublisher pub = mock(BusPublisher.class); - sink.publisher = pub; + sink.publisher = mock(BusPublisher.class); sink.send(null); } @@ -147,16 +148,14 @@ public class InlineBusTopicSinkTest extends TopicTestBase { @Test(expected = IllegalArgumentException.class) public void testSend_EmptyMessage() { sink.start(); - BusPublisher pub = mock(BusPublisher.class); - sink.publisher = pub; + sink.publisher = mock(BusPublisher.class); sink.send(""); } @Test(expected = IllegalStateException.class) public void testSend_NotStarted() { - BusPublisher pub = mock(BusPublisher.class); - sink.publisher = pub; + sink.publisher = mock(BusPublisher.class); sink.send(MY_MESSAGE); } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.java deleted file mode 100644 index 674f379f..00000000 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal; - -import static org.assertj.core.api.Assertions.assertThatCode; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase; -import org.onap.policy.common.utils.gson.GsonTestUtils; - -public class InlineUebTopicSinkTest extends TopicTestBase { - private InlineUebTopicSink sink; - - /** - * Creates the object to be tested. - */ - @Before - @Override - public void setUp() { - super.setUp(); - - sink = new InlineUebTopicSink(makeBuilder().build()); - } - - @After - public void tearDown() { - sink.shutdown(); - } - - @Test - public void testSerialize() { - assertThatCode(() -> new GsonTestUtils().compareGson(sink, InlineUebTopicSinkTest.class)) - .doesNotThrowAnyException(); - } - - @Test - public void testToString() { - assertTrue(sink.toString().startsWith("InlineUebTopicSink [")); - } - - @Test - public void testInit() { - assertThatCode(() -> sink.init()).doesNotThrowAnyException(); - } - - @Test - public void testGetTopicCommInfrastructure() { - assertEquals(CommInfrastructure.UEB, sink.getTopicCommInfrastructure()); - } - -} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.java deleted file mode 100644 index 6536d0e8..00000000 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal; - -import static org.assertj.core.api.Assertions.assertThatCode; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase; -import org.onap.policy.common.utils.gson.GsonTestUtils; - -public class SingleThreadedUebTopicSourceTest extends TopicTestBase { - private SingleThreadedUebTopicSource source; - - /** - * Creates the object to be tested. - */ - @Before - @Override - public void setUp() { - super.setUp(); - - source = new SingleThreadedUebTopicSource(makeBuilder().build()); - } - - @After - public void tearDown() { - source.shutdown(); - } - - @Test - public void testSerialize() { - assertThatCode(() -> new GsonTestUtils().compareGson(source, SingleThreadedUebTopicSourceTest.class)) - .doesNotThrowAnyException(); - } - - @Test - public void testToString() { - assertTrue(source.toString().startsWith("SingleThreadedUebTopicSource [")); - source.shutdown(); - } - - @Test - public void testGetTopicCommInfrastructure() { - assertEquals(CommInfrastructure.UEB, source.getTopicCommInfrastructure()); - } - -} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java index 2605c14b..704b2cb0 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java @@ -3,6 +3,7 @@ * ONAP * ================================================================================ * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -68,7 +69,7 @@ public class BidirectionalTopicClientTest { private static final String SOURCE_TOPIC = "my-source-topic"; private static final String MY_TEXT = "my-text"; - private static final CommInfrastructure SINK_INFRA = CommInfrastructure.UEB; + private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP; private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.NOOP; @Mock diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.json index 48380426..5b2e712a 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.json @@ -1,55 +1,30 @@ { - "locked" : false, - "alive" : false, - "topicSources" : [ { - "servers" : [ "my-server" ], - "topic" : "ueb-source", - "effectiveTopic" : "my-effective-topic", - "recentEvents" : [ ], - "alive" : false, - "locked" : false, - "apiKey" : "my-api-key", - "apiSecret" : "my-api-secret", - "useHttps" : true, - "allowTracing": false, - "allowSelfSignedCerts" : true, - "consumerGroup" : "${obj.topicSources[0].consumerGroup}", - "consumerInstance" : "${obj.topicSources[0].consumerInstance}", - "fetchTimeout" : 101, - "fetchLimit" : 100, - "topicCommInfrastructure" : "UEB" - }, + "locked": false, + "alive": false, + "topicSources": [ { - "servers" : [ "my-server" ], - "topic" : "noop-source", - "effectiveTopic" : "noop-source", - "recentEvents" : [ ], - "alive" : false, - "locked" : false, - "topicCommInfrastructure" : "NOOP" - } ], - "topicSinks" : [ { - "servers" : [ "my-server" ], - "topic" : "ueb-sink", - "effectiveTopic" : "my-effective-topic", - "recentEvents" : [ ], - "alive" : false, - "locked" : false, - "apiKey" : "my-api-key", - "apiSecret" : "my-api-secret", - "useHttps" : true, - "allowTracing": false, - "allowSelfSignedCerts" : true, - "topicCommInfrastructure" : "UEB", - "partitionKey" : "${obj.topicSinks[0].partitionKey}" - }, + "servers": [ + "my-server" + ], + "topic": "noop-source", + "effectiveTopic": "noop-source", + "recentEvents": [], + "alive": false, + "locked": false, + "topicCommInfrastructure": "NOOP" + } + ], + "topicSinks": [ { - "servers" : [ "my-server" ], - "topic" : "noop-sink", - "effectiveTopic" : "noop-sink", - "recentEvents" : [ ], - "alive" : false, - "locked" : false, - "topicCommInfrastructure" : "NOOP" - } ] + "servers": [ + "my-server" + ], + "topic": "noop-sink", + "effectiveTopic": "noop-sink", + "recentEvents": [], + "alive": false, + "locked": false, + "topicCommInfrastructure": "NOOP" + } + ] } diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.json deleted file mode 100644 index 6dda9b9e..00000000 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "servers" : [ "svra", "svrb" ], - "topic" : "my-topic", - "effectiveTopic" : "my-effective-topic", - "recentEvents" : [ ], - "alive" : false, - "locked" : false, - "apiKey" : "my-api-key", - "apiSecret" : "my-api-secret", - "useHttps" : true, - "allowTracing": true, - "allowSelfSignedCerts" : true, - "topicCommInfrastructure" : "UEB", - "partitionKey" : "my-partition" -} diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.json deleted file mode 100644 index 13ee6bc6..00000000 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "servers" : [ "svra", "svrb" ], - "topic" : "my-topic", - "effectiveTopic" : "my-effective-topic", - "recentEvents" : [ ], - "alive" : false, - "locked" : false, - "apiKey" : "my-api-key", - "apiSecret" : "my-api-secret", - "useHttps" : true, - "allowTracing": true, - "allowSelfSignedCerts" : true, - "consumerGroup" : "my-cons-group", - "consumerInstance" : "my-cons-inst", - "fetchTimeout" : 101, - "fetchLimit" : 100, - "topicCommInfrastructure" : "UEB" -} |