summaryrefslogtreecommitdiffstats
path: root/policy-endpoints
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints')
-rw-r--r--policy-endpoints/pom.xml104
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java4
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java40
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java68
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java202
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java216
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java41
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java28
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java92
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java29
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java104
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java97
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java89
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java87
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java74
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java4
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.java100
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactoryTestBase.java2
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java14
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkTest.java2
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java12
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceTest.java2
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactoryTestBase.java48
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicPropertyBuilder.java107
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactoryTest.java177
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkTest.java34
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactoryTest.java210
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceTest.java34
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java73
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java100
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java25
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java13
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.java74
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.java70
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java3
-rw-r--r--policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.json77
-rw-r--r--policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.json15
-rw-r--r--policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.json18
38 files changed, 137 insertions, 2352 deletions
diff --git a/policy-endpoints/pom.xml b/policy-endpoints/pom.xml
index c6dd4b6e..9bc0c883 100644
--- a/policy-endpoints/pom.xml
+++ b/policy-endpoints/pom.xml
@@ -36,10 +36,6 @@
<name>policy-endpoints</name>
<description>Endpoints</description>
- <properties>
- <cambria.version>1.2.1-oss</cambria.version>
- </properties>
-
<dependencyManagement>
<dependencies>
<dependency>
@@ -67,6 +63,7 @@
<groupId>org.onap.policy.common</groupId>
<artifactId>utils</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.onap.policy.common</groupId>
@@ -80,48 +77,18 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>com.att.nsa</groupId>
- <artifactId>cambriaClient</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>jackson-module-jakarta-xmlbind-annotations</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-yaml</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.glassfish.jaxb</groupId>
<artifactId>jaxb-runtime</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.containers</groupId>
- <artifactId>jersey-container-servlet</artifactId>
- </dependency>
- <dependency>
- <groupId>org.glassfish.jersey.media</groupId>
- <artifactId>jersey-media-json-jackson</artifactId>
- <version>${version.jersey}</version>
- <scope>compile</scope>
+ <artifactId>jersey-container-servlet-core</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
@@ -132,22 +99,7 @@
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
<version>${version.jersey}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.httpcomponents.client5</groupId>
- <artifactId>httpclient5</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.httpcomponents.core5</groupId>
- <artifactId>httpcore5</artifactId>
- </dependency>
- <dependency>
- <groupId>jakarta.activation</groupId>
- <artifactId>jakarta.activation-api</artifactId>
- </dependency>
- <dependency>
- <groupId>jakarta.inject</groupId>
- <artifactId>jakarta.inject-api</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
@@ -158,32 +110,66 @@
<artifactId>jetty-security</artifactId>
</dependency>
<dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-jexl3</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>io.swagger.core.v3</groupId>
<artifactId>swagger-annotations</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-kafka-clients-2.6</artifactId>
</dependency>
<dependency>
- <groupId>io.opentelemetry</groupId>
- <artifactId>opentelemetry-exporter-otlp</artifactId>
+ <groupId>io.swagger.core.v3</groupId>
+ <artifactId>swagger-jaxrs2-jakarta</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
- <groupId>io.opentelemetry</groupId>
- <artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
+ <groupId>io.swagger.core.v3</groupId>
+ <artifactId>swagger-jaxrs2-servlet-initializer-v2-jakarta</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
- <groupId>com.sun.xml.bind</groupId>
- <artifactId>jaxb-impl</artifactId>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient_hotspot</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient_servlet_jakarta</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-test</artifactId>
+ <scope>test</scope>
</dependency>
</dependencies>
</project>
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java
index d09e7353..ce8e2387 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java
@@ -37,10 +37,6 @@ public interface Topic extends TopicRegisterable, Startable, Lockable {
*/
enum CommInfrastructure {
/**
- * UEB Communication Infrastructure.
- */
- UEB,
- /**
* KAFKA Communication Infrastructure.
*/
KAFKA,
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java
index 8cae5bd1..bf261def 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java
@@ -29,8 +29,6 @@ import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink;
import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource;
import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
import org.onap.policy.common.endpoints.parameters.TopicParameters;
@@ -129,18 +127,6 @@ public interface TopicEndpoint extends Startable, Lockable {
TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName);
/**
- * Get the UEB Topic Source for the given topic name.
- *
- * @param topicName the topic name
- *
- * @return the UEB Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for example multiple
- * TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- UebTopicSource getUebTopicSource(String topicName);
-
- /**
* Get the Noop Source for the given topic name.
*
* @param topicName the topic name.
@@ -198,18 +184,6 @@ public interface TopicEndpoint extends Startable, Lockable {
TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName);
/**
- * Get the UEB Topic Source for the given topic name.
- *
- * @param topicName the topic name
- *
- * @return the Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for example multiple
- * TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- UebTopicSink getUebTopicSink(String topicName);
-
- /**
* Get the no-op Topic Sink for the given topic name.
*
* @param topicName the topic name
@@ -234,13 +208,6 @@ public interface TopicEndpoint extends Startable, Lockable {
KafkaTopicSink getKafkaTopicSink(String topicName);
/**
- * Gets only the UEB Topic Sources.
- *
- * @return the UEB Topic Source List
- */
- List<UebTopicSource> getUebTopicSources();
-
- /**
* Gets only the KAFKA Topic Sources.
*
* @return the KAFKA Topic Source List
@@ -255,13 +222,6 @@ public interface TopicEndpoint extends Startable, Lockable {
List<NoopTopicSource> getNoopTopicSources();
/**
- * Gets only the UEB Topic Sinks.
- *
- * @return the UEB Topic Sink List
- */
- List<UebTopicSink> getUebTopicSinks();
-
- /**
* Gets only the KAFKA Topic Sinks.
*
* @return the KAFKA Topic Sinks List
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java
index 4ec8eb54..d9e55ddd 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java
@@ -34,9 +34,6 @@ import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicFactories;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource;
import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
import org.onap.policy.common.endpoints.parameters.TopicParameters;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
@@ -90,9 +87,6 @@ class TopicEndpointProxy implements TopicEndpoint {
for (TopicParameters param : paramList) {
switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) {
- case UEB:
- sources.add(UebTopicFactories.getSourceFactory().build(param));
- break;
case KAFKA:
sources.add(KafkaTopicFactories.getSourceFactory().build(param));
break;
@@ -114,13 +108,11 @@ class TopicEndpointProxy implements TopicEndpoint {
@Override
public List<TopicSource> addTopicSources(Properties properties) {
- // 1. Create UEB Sources
- // 2. Create KAFKA Sources
- // 3. Create NOOP Sources
+ // 1. Create KAFKA Sources
+ // 2. Create NOOP Sources
List<TopicSource> sources = new ArrayList<>();
- sources.addAll(UebTopicFactories.getSourceFactory().build(properties));
sources.addAll(KafkaTopicFactories.getSourceFactory().build(properties));
sources.addAll(NoopTopicFactories.getSourceFactory().build(properties));
@@ -141,9 +133,6 @@ class TopicEndpointProxy implements TopicEndpoint {
for (TopicParameters param : paramList) {
switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) {
- case UEB:
- sinks.add(UebTopicFactories.getSinkFactory().build(param));
- break;
case KAFKA:
sinks.add(KafkaTopicFactories.getSinkFactory().build(param));
break;
@@ -164,13 +153,11 @@ class TopicEndpointProxy implements TopicEndpoint {
@Override
public List<TopicSink> addTopicSinks(Properties properties) {
- // 1. Create UEB Sinks
- // 2. Create KAFKA Sinks
- // 3. Create NOOP Sinks
+ // 1. Create KAFKA Sinks
+ // 2. Create NOOP Sinks
final List<TopicSink> sinks = new ArrayList<>();
- sinks.addAll(UebTopicFactories.getSinkFactory().build(properties));
sinks.addAll(KafkaTopicFactories.getSinkFactory().build(properties));
sinks.addAll(NoopTopicFactories.getSinkFactory().build(properties));
@@ -190,7 +177,6 @@ class TopicEndpointProxy implements TopicEndpoint {
final List<TopicSource> sources = new ArrayList<>();
- sources.addAll(UebTopicFactories.getSourceFactory().inventory());
sources.addAll(KafkaTopicFactories.getSourceFactory().inventory());
sources.addAll(NoopTopicFactories.getSourceFactory().inventory());
@@ -208,12 +194,6 @@ class TopicEndpointProxy implements TopicEndpoint {
topicNames.forEach(topic -> {
try {
- sources.add(Objects.requireNonNull(this.getUebTopicSource(topic)));
- } catch (final Exception e) {
- logger.debug("No UEB source for topic: {}", topic, e);
- }
-
- try {
sources.add(Objects.requireNonNull(this.getKafkaTopicSource(topic)));
} catch (final Exception e) {
logger.debug("No KAFKA source for topic: {}", topic, e);
@@ -234,7 +214,6 @@ class TopicEndpointProxy implements TopicEndpoint {
final List<TopicSink> sinks = new ArrayList<>();
- sinks.addAll(UebTopicFactories.getSinkFactory().inventory());
sinks.addAll(KafkaTopicFactories.getSinkFactory().inventory());
sinks.addAll(NoopTopicFactories.getSinkFactory().inventory());
@@ -251,12 +230,6 @@ class TopicEndpointProxy implements TopicEndpoint {
final List<TopicSink> sinks = new ArrayList<>();
for (final String topic : topicNames) {
try {
- sinks.add(Objects.requireNonNull(this.getUebTopicSink(topic)));
- } catch (final Exception e) {
- logger.debug("No UEB sink for topic: {}", topic, e);
- }
-
- try {
sinks.add(Objects.requireNonNull(this.getKafkaTopicSink(topic)));
} catch (final Exception e) {
logger.debug("No KAFKA sink for topic: {}", topic, e);
@@ -280,12 +253,6 @@ class TopicEndpointProxy implements TopicEndpoint {
final List<TopicSink> sinks = new ArrayList<>();
try {
- sinks.add(this.getUebTopicSink(topicName));
- } catch (final Exception e) {
- logNoSink(topicName, e);
- }
-
- try {
sinks.add(this.getKafkaTopicSink(topicName));
} catch (final Exception e) {
logNoSink(topicName, e);
@@ -302,12 +269,6 @@ class TopicEndpointProxy implements TopicEndpoint {
@GsonJsonIgnore
@Override
- public List<UebTopicSource> getUebTopicSources() {
- return UebTopicFactories.getSourceFactory().inventory();
- }
-
- @GsonJsonIgnore
- @Override
public List<KafkaTopicSource> getKafkaTopicSources() {
return KafkaTopicFactories.getSourceFactory().inventory();
}
@@ -318,12 +279,6 @@ class TopicEndpointProxy implements TopicEndpoint {
return NoopTopicFactories.getSourceFactory().inventory();
}
- @GsonJsonIgnore
- @Override
- public List<UebTopicSink> getUebTopicSinks() {
- return UebTopicFactories.getSinkFactory().inventory();
- }
-
@Override
@GsonJsonIgnore
public List<KafkaTopicSink> getKafkaTopicSinks() {
@@ -411,9 +366,6 @@ class TopicEndpointProxy implements TopicEndpoint {
public void shutdown() {
this.stop();
- UebTopicFactories.getSourceFactory().destroy();
- UebTopicFactories.getSinkFactory().destroy();
-
KafkaTopicFactories.getSourceFactory().destroy();
KafkaTopicFactories.getSinkFactory().destroy();
@@ -478,7 +430,6 @@ class TopicEndpointProxy implements TopicEndpoint {
}
return switch (commType) {
- case UEB -> this.getUebTopicSource(topicName);
case KAFKA -> this.getKafkaTopicSource(topicName);
case NOOP -> this.getNoopTopicSource(topicName);
default -> throw new UnsupportedOperationException("Unsupported " + commType.name());
@@ -496,7 +447,6 @@ class TopicEndpointProxy implements TopicEndpoint {
}
return switch (commType) {
- case UEB -> this.getUebTopicSink(topicName);
case KAFKA -> this.getKafkaTopicSink(topicName);
case NOOP -> this.getNoopTopicSink(topicName);
default -> throw new UnsupportedOperationException("Unsupported " + commType.name());
@@ -504,16 +454,6 @@ class TopicEndpointProxy implements TopicEndpoint {
}
@Override
- public UebTopicSource getUebTopicSource(String topicName) {
- return UebTopicFactories.getSourceFactory().get(topicName);
- }
-
- @Override
- public UebTopicSink getUebTopicSink(String topicName) {
- return UebTopicFactories.getSinkFactory().get(topicName);
- }
-
- @Override
public KafkaTopicSource getKafkaTopicSource(String topicName) {
return KafkaTopicFactories.getSourceFactory().get(topicName);
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java
deleted file mode 100644
index b04fc078..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import com.google.re2j.Pattern;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
-import org.apache.commons.lang3.StringUtils;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-import org.onap.policy.common.endpoints.utils.PropertyUtils;
-import org.onap.policy.common.endpoints.utils.UebPropertyUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Factory of UEB Reader Topics indexed by topic name.
- */
-class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
- private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
- private static final String MISSING_TOPIC = "A topic must be provided";
-
- /**
- * Logger.
- */
- private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
-
- /**
- * UEB Topic Name Index.
- */
- protected HashMap<String, UebTopicSink> uebTopicSinks = new HashMap<>();
-
- @Override
- public UebTopicSink build(BusTopicParams busTopicParams) {
-
- if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
- throw new IllegalArgumentException("UEB Server(s) must be provided");
- }
-
- if (StringUtils.isBlank(busTopicParams.getTopic())) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized (this) {
- if (uebTopicSinks.containsKey(busTopicParams.getTopic())) {
- return uebTopicSinks.get(busTopicParams.getTopic());
- }
-
- UebTopicSink uebTopicWriter = makeSink(busTopicParams);
-
- if (busTopicParams.isManaged()) {
- uebTopicSinks.put(busTopicParams.getTopic(), uebTopicWriter);
- }
-
- return uebTopicWriter;
- }
- }
-
-
- @Override
- public UebTopicSink build(List<String> servers, String topic) {
- return this.build(BusTopicParams.builder()
- .servers(servers)
- .topic(topic)
- .managed(true)
- .useHttps(false)
- .allowSelfSignedCerts(false)
- .build());
- }
-
-
- @Override
- public List<UebTopicSink> build(Properties properties) {
-
- String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS);
- if (StringUtils.isBlank(writeTopics)) {
- logger.info("{}: no topic for UEB Sink", this);
- return new ArrayList<>();
- }
-
- List<UebTopicSink> newUebTopicSinks = new ArrayList<>();
- synchronized (this) {
- for (String topic : COMMA_SPACE_PAT.split(writeTopics)) {
- addTopic(newUebTopicSinks, topic, properties);
- }
- return newUebTopicSinks;
- }
- }
-
- private void addTopic(List<UebTopicSink> newUebTopicSinks, String topic, Properties properties) {
- if (this.uebTopicSinks.containsKey(topic)) {
- newUebTopicSinks.add(this.uebTopicSinks.get(topic));
- return;
- }
-
- String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic;
-
- var props = new PropertyUtils(properties, topicPrefix,
- (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
-
- String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
- if (StringUtils.isBlank(servers)) {
- logger.error("{}: no UEB servers configured for sink {}", this, topic);
- return;
- }
-
- UebTopicSink uebTopicWriter = this.build(UebPropertyUtils.makeBuilder(props, topic, servers)
- .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null))
- .build());
- newUebTopicSinks.add(uebTopicWriter);
- }
-
- @Override
- public void destroy(String topic) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- UebTopicSink uebTopicWriter;
- synchronized (this) {
- if (!uebTopicSinks.containsKey(topic)) {
- return;
- }
-
- uebTopicWriter = uebTopicSinks.remove(topic);
- }
-
- uebTopicWriter.shutdown();
- }
-
- @Override
- public void destroy() {
- List<UebTopicSink> writers = this.inventory();
- for (UebTopicSink writer : writers) {
- writer.shutdown();
- }
-
- synchronized (this) {
- this.uebTopicSinks.clear();
- }
- }
-
- @Override
- public UebTopicSink get(String topic) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized (this) {
- if (uebTopicSinks.containsKey(topic)) {
- return uebTopicSinks.get(topic);
- } else {
- throw new IllegalStateException("UebTopicSink for " + topic + " not found");
- }
- }
- }
-
- @Override
- public synchronized List<UebTopicSink> inventory() {
- return new ArrayList<>(this.uebTopicSinks.values());
- }
-
- /**
- * Makes a new sink.
- *
- * @param busTopicParams parameters to use to configure the sink
- * @return a new sink
- */
- protected UebTopicSink makeSink(BusTopicParams busTopicParams) {
- return new InlineUebTopicSink(busTopicParams);
- }
-
-
- @Override
- public String toString() {
- return "IndexedUebTopicSinkFactory " + uebTopicSinks.keySet();
- }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java
deleted file mode 100644
index 09500978..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import com.google.re2j.Pattern;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
-import org.apache.commons.lang3.StringUtils;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-import org.onap.policy.common.endpoints.utils.PropertyUtils;
-import org.onap.policy.common.endpoints.utils.UebPropertyUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Factory of UEB Source Topics indexed by topic name.
- */
-class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
- private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
- private static final String MISSING_TOPIC = "A topic must be provided";
-
- /**
- * Logger.
- */
- private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class);
-
- /**
- * UEB Topic Name Index.
- */
- protected HashMap<String, UebTopicSource> uebTopicSources = new HashMap<>();
-
- @Override
- public UebTopicSource build(BusTopicParams busTopicParams) {
- if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
- throw new IllegalArgumentException("UEB Server(s) must be provided");
- }
-
- if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized (this) {
- if (uebTopicSources.containsKey(busTopicParams.getTopic())) {
- return uebTopicSources.get(busTopicParams.getTopic());
- }
-
- var uebTopicSource = makeSource(busTopicParams);
-
- if (busTopicParams.isManaged()) {
- uebTopicSources.put(busTopicParams.getTopic(), uebTopicSource);
- }
-
- return uebTopicSource;
- }
- }
-
- @Override
- public List<UebTopicSource> build(Properties properties) {
-
- String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS);
- if (StringUtils.isBlank(readTopics)) {
- logger.info("{}: no topic for UEB Source", this);
- return new ArrayList<>();
- }
-
- List<UebTopicSource> newUebTopicSources = new ArrayList<>();
- synchronized (this) {
- for (String topic : COMMA_SPACE_PAT.split(readTopics)) {
- addTopic(newUebTopicSources, topic, properties);
- }
- }
- return newUebTopicSources;
- }
-
- @Override
- public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
-
- return this.build(BusTopicParams.builder()
- .servers(servers)
- .topic(topic)
- .apiKey(apiKey)
- .apiSecret(apiSecret)
- .fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH)
- .fetchLimit(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH)
- .managed(true)
- .useHttps(false)
- .allowSelfSignedCerts(true).build());
- }
-
- @Override
- public UebTopicSource build(List<String> servers, String topic) {
- return this.build(servers, topic, null, null);
- }
-
- private void addTopic(List<UebTopicSource> newUebTopicSources, String topic, Properties properties) {
- if (this.uebTopicSources.containsKey(topic)) {
- newUebTopicSources.add(this.uebTopicSources.get(topic));
- return;
- }
-
- String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic;
-
- var props = new PropertyUtils(properties, topicPrefix,
- (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
-
- String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
- if (StringUtils.isBlank(servers)) {
- logger.error("{}: no UEB servers configured for sink {}", this, topic);
- return;
- }
-
- var uebTopicSource = this.build(UebPropertyUtils.makeBuilder(props, topic, servers)
- .consumerGroup(props.getString(
- PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null))
- .consumerInstance(props.getString(
- PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null))
- .fetchTimeout(props.getInteger(
- PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX,
- PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH))
- .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX,
- PolicyEndPointProperties.DEFAULT_LIMIT_FETCH))
- .build());
-
- newUebTopicSources.add(uebTopicSource);
- }
-
- /**
- * Makes a new source.
- *
- * @param busTopicParams parameters to use to configure the source
- * @return a new source
- */
- protected UebTopicSource makeSource(BusTopicParams busTopicParams) {
- return new SingleThreadedUebTopicSource(busTopicParams);
- }
-
- @Override
- public void destroy(String topic) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- UebTopicSource uebTopicSource;
-
- synchronized (this) {
- if (!uebTopicSources.containsKey(topic)) {
- return;
- }
-
- uebTopicSource = uebTopicSources.remove(topic);
- }
-
- uebTopicSource.shutdown();
- }
-
- @Override
- public void destroy() {
- List<UebTopicSource> readers = this.inventory();
- for (UebTopicSource reader : readers) {
- reader.shutdown();
- }
-
- synchronized (this) {
- this.uebTopicSources.clear();
- }
- }
-
- @Override
- public UebTopicSource get(String topic) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized (this) {
- if (uebTopicSources.containsKey(topic)) {
- return uebTopicSources.get(topic);
- } else {
- throw new IllegalStateException("UebTopiceSource for " + topic + " not found");
- }
- }
- }
-
- @Override
- public synchronized List<UebTopicSource> inventory() {
- return new ArrayList<>(this.uebTopicSources.values());
- }
-
- @Override
- public String toString() {
- return "IndexedUebTopicSourceFactory " + uebTopicSources.keySet();
- }
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java
deleted file mode 100644
index 721f2135..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import lombok.AccessLevel;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public class UebTopicFactories {
-
- /**
- * Factory for instantiation and management of sinks.
- */
- @Getter
- private static final UebTopicSinkFactory sinkFactory = new IndexedUebTopicSinkFactory();
-
- /**
- * Factory for instantiation and management of sources.
- */
- @Getter
- private static final UebTopicSourceFactory sourceFactory = new IndexedUebTopicSourceFactory();
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java
deleted file mode 100644
index acfef6da..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-/**
- * Topic Writer over UEB Infrastructure.
- */
-public interface UebTopicSink extends BusTopicSink {
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java
deleted file mode 100644
index 0cf095fd..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
- * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import java.util.List;
-import java.util.Properties;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-
-/**
- * UEB Topic Sink Factory.
- */
-public interface UebTopicSinkFactory {
-
- /**
- * Instantiates a new UEB Topic Writer.
- *
- * @param busTopicParams parameters object
- * @return an UEB Topic Sink
- */
- UebTopicSink build(BusTopicParams busTopicParams);
-
- /**
- * Creates an UEB Topic Writer based on properties files.
- *
- * @param properties Properties containing initialization values
- *
- * @return an UEB Topic Writer
- * @throws IllegalArgumentException if invalid parameters are present
- */
- List<UebTopicSink> build(Properties properties);
-
- /**
- * Instantiates a new UEB Topic Writer.
- *
- * @param servers list of servers
- * @param topic topic name
- *
- * @return an UEB Topic Writer
- * @throws IllegalArgumentException if invalid parameters are present
- */
- UebTopicSink build(List<String> servers, String topic);
-
- /**
- * Destroys an UEB Topic Writer based on a topic.
- *
- * @param topic topic name
- * @throws IllegalArgumentException if invalid parameters are present
- */
- void destroy(String topic);
-
- /**
- * Destroys all UEB Topic Writers.
- */
- void destroy();
-
- /**
- * gets an UEB Topic Writer based on topic name.
- *
- * @param topic the topic name
- *
- * @return an UEB Topic Writer with topic name
- * @throws IllegalArgumentException if an invalid topic is provided
- * @throws IllegalStateException if the UEB Topic Reader is an incorrect state
- */
- UebTopicSink get(String topic);
-
- /**
- * Provides a snapshot of the UEB Topic Writers.
- *
- * @return a list of the UEB Topic Writers
- */
- List<UebTopicSink> inventory();
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java
deleted file mode 100644
index 56534309..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-/**
- * Topic Source for UEB Communication Infrastructure.
- *
- */
-public interface UebTopicSource extends BusTopicSource {
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java
deleted file mode 100644
index beacee3b..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
- * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import java.util.List;
-import java.util.Properties;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-
-/**
- * UEB Topic Source Factory.
- */
-public interface UebTopicSourceFactory {
-
- /**
- * Creates an UEB Topic Source based on properties files.
- *
- * @param properties Properties containing initialization values
- *
- * @return an UEB Topic Source
- * @throws IllegalArgumentException if invalid parameters are present
- */
- List<UebTopicSource> build(Properties properties);
-
- /**
- * Instantiates a new UEB Topic Source.
- *
- * @param busTopicParams parameters object
- * @return an UEB Topic Source
- */
- UebTopicSource build(BusTopicParams busTopicParams);
-
- /**
- * Instantiates a new UEB Topic Source.
- *
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- *
- * @return an UEB Topic Source
- * @throws IllegalArgumentException if invalid parameters are present
- */
- UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
-
- /**
- * Instantiates a new UEB Topic Source.
- *
- * @param servers list of servers
- * @param topic topic name
- *
- * @return an UEB Topic Source
- * @throws IllegalArgumentException if invalid parameters are present
- */
- UebTopicSource build(List<String> servers, String topic);
-
- /**
- * Destroys an UEB Topic Source based on a topic.
- *
- * @param topic topic name
- * @throws IllegalArgumentException if invalid parameters are present
- */
- void destroy(String topic);
-
- /**
- * Destroys all UEB Topic Sources.
- */
- void destroy();
-
- /**
- * Gets an UEB Topic Source based on topic name.
- *
- * @param topic the topic name
- * @return an UEB Topic Source with topic name
- * @throws IllegalArgumentException if an invalid topic is provided
- * @throws IllegalStateException if the UEB Topic Source is an incorrect state
- */
- UebTopicSource get(String topic);
-
- /**
- * Provides a snapshot of the UEB Topic Sources.
- *
- * @return a list of the UEB Topic Sources
- */
- List<UebTopicSource> inventory();
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
index 3c57d1ba..b46c2715 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
@@ -23,9 +23,6 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-import com.att.nsa.cambria.client.CambriaConsumer;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.TraceFlags;
@@ -33,9 +30,7 @@ import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor;
import java.io.IOException;
-import java.net.MalformedURLException;
import java.nio.charset.StandardCharsets;
-import java.security.GeneralSecurityException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
@@ -138,98 +133,6 @@ public interface BusConsumer {
}
/**
- * Cambria based consumer.
- */
- public static class CambriaConsumerWrapper extends FetchingBusConsumer {
-
- /**
- * logger.
- */
- private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
-
- /**
- * Used to build the consumer.
- */
- private final ConsumerBuilder builder;
-
- /**
- * Cambria client.
- */
- private final CambriaConsumer consumer;
-
- /**
- * Cambria Consumer Wrapper.
- * BusTopicParam object contains the following parameters
- * servers - messaging bus hosts.
- * topic - topic for messages
- * apiKey - API Key
- * apiSecret - API Secret
- * consumerGroup - Consumer Group
- * consumerInstance - Consumer Instance
- * fetchTimeout - Fetch Timeout
- * fetchLimit - Fetch Limit
- *
- * @param busTopicParams - The parameters for the bus topic
- */
- public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
- super(busTopicParams);
-
- this.builder = new CambriaClientBuilders.ConsumerBuilder();
-
- builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
- .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
- .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
-
- // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
- builder.withSocketTimeout(fetchTimeout + 30000);
-
- if (busTopicParams.isUseHttps()) {
- builder.usingHttps();
-
- if (busTopicParams.isAllowSelfSignedCerts()) {
- builder.allowSelfSignedCertificates();
- }
- }
-
- if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
- builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
- }
-
- if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
- builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
- }
-
- try {
- this.consumer = builder.build();
- } catch (MalformedURLException | GeneralSecurityException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- @Override
- public Iterable<String> fetch() throws IOException {
- try {
- return this.consumer.fetch();
- } catch (final IOException e) { //NOSONAR
- logger.error("{}: cannot fetch because of {}", this, e.getMessage());
- sleepAfterFetchFailure();
- throw e;
- }
- }
-
- @Override
- public void close() {
- super.close();
- this.consumer.close();
- }
-
- @Override
- public String toString() {
- return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
- }
- }
-
- /**
* Kafka based consumer.
*/
class KafkaConsumerWrapper extends FetchingBusConsumer {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
index e2adde0d..1b57e48e 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
@@ -23,19 +23,13 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
-import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,89 +55,6 @@ public interface BusPublisher {
void close();
/**
- * Cambria based library publisher.
- */
- class CambriaPublisherWrapper implements BusPublisher {
-
- private static final Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
-
- /**
- * The actual Cambria publisher.
- */
- @GsonJsonIgnore
- protected CambriaBatchingPublisher publisher;
-
- /**
- * Constructor.
- *
- * @param busTopicParams topic parameters
- */
- public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
-
- var builder = new CambriaClientBuilders.PublisherBuilder();
-
- builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
-
- // Set read timeout to 30 seconds (TBD: this should be configurable)
- builder.withSocketTimeout(30000);
-
- if (busTopicParams.isUseHttps()) {
- if (busTopicParams.isAllowSelfSignedCerts()) {
- builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
- } else {
- builder.withConnectionType(ConnectionType.HTTPS);
- }
- }
-
- if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
- builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
- }
-
- if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
- builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
- }
-
- try {
- this.publisher = builder.build();
- } catch (MalformedURLException | GeneralSecurityException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- @Override
- public boolean send(String partitionId, String message) {
- if (message == null) {
- throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
- }
-
- try {
- this.publisher.send(partitionId, message);
- } catch (Exception e) {
- logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
- return false;
- }
- return true;
- }
-
- @Override
- public void close() {
- logger.info(LOG_CLOSE, this);
-
- try {
- this.publisher.close();
- } catch (Exception e) {
- logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
- }
- }
-
- @Override
- public String toString() {
- return "CambriaPublisherWrapper []";
- }
-
- }
-
- /**
* Kafka based library publisher.
*/
class KafkaPublisherWrapper implements BusPublisher {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java
deleted file mode 100644
index 896cb3bb..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved.
- * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal;
-
-import org.onap.policy.common.endpoints.event.comm.Topic;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This implementation publishes events for the associated UEB topic, inline with the calling
- * thread.
- */
-public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSink {
-
- /**
- * Logger.
- */
- private static Logger logger = LoggerFactory.getLogger(InlineUebTopicSink.class);
-
- /**
- * Argument-based UEB Topic Writer instantiation. BusTopicParams contains below mentioned
- * attributes.
- *
- * <p>servers list of UEB servers available for publishing
- * topic the topic to publish to
- * apiKey the api key (optional)
- * apiSecret the api secret (optional)
- * partitionId the partition key (optional, autogenerated if not provided)
- * useHttps does connection use HTTPS?
- * allowTracing is tracing allowed?
- * allowSelfSignedCerts are self-signed certificates allow
- * @param busTopicParams contains attributes needed
- * @throws IllegalArgumentException if invalid arguments are detected
- */
- public InlineUebTopicSink(BusTopicParams busTopicParams) {
- super(busTopicParams);
- }
-
- /**
- * Instantiation of internal resources.
- */
- @Override
- public void init() {
-
- this.publisher = new BusPublisher.CambriaPublisherWrapper(BusTopicParams.builder()
- .servers(this.servers)
- .topic(this.effectiveTopic)
- .apiKey(this.apiKey)
- .apiSecret(this.apiSecret)
- .useHttps(this.useHttps)
- .allowTracing(this.allowTracing)
- .allowSelfSignedCerts(this.allowSelfSignedCerts)
- .build());
- logger.info("{}: UEB SINK created", this);
- }
-
- @Override
- public String toString() {
- return "InlineUebTopicSink [getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()="
- + super.toString() + "]";
- }
-
- @Override
- public CommInfrastructure getTopicCommInfrastructure() {
- return Topic.CommInfrastructure.UEB;
- }
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java
deleted file mode 100644
index ead04594..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal;
-
-import org.onap.policy.common.endpoints.event.comm.Topic;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource;
-
-/**
- * This topic source implementation specializes in reading messages over an UEB Bus topic source and
- * notifying its listeners.
- */
-public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource implements UebTopicSource {
-
- /**
- * Constructor.
- *
- * @param busTopicParams Parameters object containing all the required inputs
- * @throws IllegalArgumentException An invalid parameter passed in
- */
- public SingleThreadedUebTopicSource(BusTopicParams busTopicParams) {
- super(busTopicParams);
- this.init();
- }
-
- /**
- * Initialize the Cambria client.
- */
- @Override
- public void init() {
- this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder()
- .servers(this.servers)
- .topic(this.effectiveTopic)
- .apiKey(this.apiKey)
- .apiSecret(this.apiSecret)
- .consumerGroup(this.consumerGroup)
- .consumerInstance(this.consumerInstance)
- .fetchTimeout(this.fetchTimeout)
- .fetchLimit(this.fetchLimit)
- .useHttps(this.useHttps)
- .allowTracing(this.allowTracing)
- .allowSelfSignedCerts(this.allowSelfSignedCerts).build());
- }
-
- @Override
- public CommInfrastructure getTopicCommInfrastructure() {
- return Topic.CommInfrastructure.UEB;
- }
-
- @Override
- public String toString() {
- return "SingleThreadedUebTopicSource [getTopicCommInfrastructure()=" + getTopicCommInfrastructure()
- + ", toString()=" + super.toString() + "]";
- }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java
index 5f49ea34..0ccc8a75 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java
@@ -3,7 +3,7 @@
* ONAP PAP
* ================================================================================
* Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019, 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
/**
* Client for sending messages to a Topic using TopicSink.
*/
+@Getter
public class TopicSinkClient {
private static final Logger logger = LoggerFactory.getLogger(TopicSinkClient.class);
@@ -46,7 +47,6 @@ public class TopicSinkClient {
/**
* Where messages are published.
*/
- @Getter
private final TopicSink sink;
/**
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.java
index 92dd6483..b6777db7 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.java
@@ -37,8 +37,6 @@ import org.junit.Test;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicPropertyBuilder;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicFactories;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicPropertyBuilder;
import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
import org.onap.policy.common.endpoints.parameters.TopicParameters;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
@@ -49,11 +47,8 @@ public class TopicEndpointProxyTest {
private static final String NOOP_SOURCE_TOPIC = "noop-source";
private static final String NOOP_SINK_TOPIC = "noop-sink";
- private static final String UEB_SOURCE_TOPIC = "ueb-source";
- private static final String UEB_SINK_TOPIC = "ueb-sink";
-
- private Properties configuration = new Properties();
- private TopicParameterGroup group = new TopicParameterGroup();
+ private final Properties configuration = new Properties();
+ private final TopicParameterGroup group = new TopicParameterGroup();
/**
* Constructor.
@@ -74,18 +69,6 @@ public class TopicEndpointProxyTest {
configuration.putAll(noopSinkBuilder.build());
group.getTopicSinks().add(noopSinkBuilder.getParams());
- UebTopicPropertyBuilder uebSourceBuilder =
- new UebTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS)
- .makeTopic(UEB_SOURCE_TOPIC);
- configuration.putAll(uebSourceBuilder.build());
- group.getTopicSources().add(uebSourceBuilder.getParams());
-
- UebTopicPropertyBuilder uebSinkBuilder =
- new UebTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS)
- .makeTopic(UEB_SINK_TOPIC);
- configuration.putAll(uebSinkBuilder.build());
- group.getTopicSinks().add(uebSinkBuilder.getParams());
-
TopicParameters invalidCommInfraParams =
new NoopTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS)
.makeTopic(NOOP_SOURCE_TOPIC).getParams();
@@ -99,19 +82,19 @@ public class TopicEndpointProxyTest {
}
private <T extends Topic> boolean allSources(List<T> topics) {
- return exists(topics, NOOP_SOURCE_TOPIC) && exists(topics, UEB_SOURCE_TOPIC);
+ return exists(topics, NOOP_SOURCE_TOPIC);
}
private <T extends Topic> boolean allSinks(List<T> topics) {
- return exists(topics, NOOP_SINK_TOPIC) && exists(topics, UEB_SINK_TOPIC);
+ return exists(topics, NOOP_SINK_TOPIC);
}
private <T extends Topic> boolean anySource(List<T> topics) {
- return exists(topics, NOOP_SOURCE_TOPIC) || exists(topics, UEB_SOURCE_TOPIC);
+ return exists(topics, NOOP_SOURCE_TOPIC);
}
private <T extends Topic> boolean anySink(List<T> topics) {
- return exists(topics, NOOP_SINK_TOPIC) || exists(topics, UEB_SINK_TOPIC);
+ return exists(topics, NOOP_SINK_TOPIC);
}
/**
@@ -121,9 +104,6 @@ public class TopicEndpointProxyTest {
public void tearDown() {
NoopTopicFactories.getSinkFactory().destroy();
NoopTopicFactories.getSourceFactory().destroy();
-
- UebTopicFactories.getSinkFactory().destroy();
- UebTopicFactories.getSourceFactory().destroy();
}
@Test
@@ -142,7 +122,7 @@ public class TopicEndpointProxyTest {
TopicEndpoint manager = new TopicEndpointProxy();
List<TopicSource> sources = manager.addTopicSources(group.getTopicSources());
- assertSame(2, sources.size());
+ assertSame(1, sources.size());
assertTrue(allSources(sources));
assertFalse(anySink(sources));
@@ -153,7 +133,7 @@ public class TopicEndpointProxyTest {
TopicEndpoint manager = new TopicEndpointProxy();
List<TopicSource> sources = manager.addTopicSources(configuration);
- assertSame(2, sources.size());
+ assertSame(1, sources.size());
assertTrue(allSources(sources));
assertFalse(anySink(sources));
@@ -164,7 +144,7 @@ public class TopicEndpointProxyTest {
TopicEndpoint manager = new TopicEndpointProxy();
List<TopicSink> sinks = manager.addTopicSinks(group.getTopicSinks());
- assertSame(2, sinks.size());
+ assertSame(1, sinks.size());
assertFalse(anySource(sinks));
assertTrue(allSinks(sinks));
@@ -175,7 +155,7 @@ public class TopicEndpointProxyTest {
TopicEndpoint manager = new TopicEndpointProxy();
List<TopicSink> sinks = manager.addTopicSinks(configuration);
- assertSame(2, sinks.size());
+ assertSame(1, sinks.size());
assertFalse(anySource(sinks));
assertTrue(allSinks(sinks));
@@ -186,7 +166,7 @@ public class TopicEndpointProxyTest {
TopicEndpoint manager = new TopicEndpointProxy();
List<Topic> topics = manager.addTopics(configuration);
- assertSame(4, topics.size());
+ assertSame(2, topics.size());
assertTrue(allSources(topics));
assertTrue(allSinks(topics));
@@ -197,7 +177,7 @@ public class TopicEndpointProxyTest {
TopicEndpoint manager = new TopicEndpointProxy();
List<Topic> topics = manager.addTopics(group);
- assertSame(4, topics.size());
+ assertSame(2, topics.size());
assertTrue(allSources(topics));
assertTrue(allSinks(topics));
@@ -236,7 +216,7 @@ public class TopicEndpointProxyTest {
manager.addTopicSinks(configuration);
List<TopicSource> sources = manager.getTopicSources();
- assertSame(2, sources.size());
+ assertSame(1, sources.size());
assertTrue(allSources(sources));
assertFalse(anySink(sources));
@@ -250,21 +230,13 @@ public class TopicEndpointProxyTest {
manager.addTopicSinks(configuration);
List<TopicSink> sinks = manager.getTopicSinks();
- assertSame(2, sinks.size());
+ assertSame(1, sinks.size());
assertFalse(anySource(sinks));
assertTrue(allSinks(sinks));
}
@Test
- public void testGetUebTopicSources() {
- TopicEndpoint manager = new TopicEndpointProxy();
-
- manager.addTopicSources(configuration);
- assertSame(1, manager.getUebTopicSources().size());
- }
-
- @Test
public void testGetNoopTopicSources() {
TopicEndpoint manager = new TopicEndpointProxy();
@@ -273,14 +245,6 @@ public class TopicEndpointProxyTest {
}
@Test
- public void testGetUebTopicSinks() {
- TopicEndpoint manager = new TopicEndpointProxy();
-
- manager.addTopicSinks(configuration);
- assertSame(1, manager.getUebTopicSinks().size());
- }
-
- @Test
public void testGetNoopTopicSinks() {
TopicEndpoint manager = new TopicEndpointProxy();
@@ -322,12 +286,9 @@ public class TopicEndpointProxyTest {
manager.addTopicSources(configuration);
assertSame(NOOP_SOURCE_TOPIC, manager.getTopicSource(CommInfrastructure.NOOP, NOOP_SOURCE_TOPIC).getTopic());
- assertSame(UEB_SOURCE_TOPIC, manager.getTopicSource(CommInfrastructure.UEB, UEB_SOURCE_TOPIC).getTopic());
assertThatIllegalStateException()
.isThrownBy(() -> manager.getTopicSource(CommInfrastructure.NOOP, NOOP_SINK_TOPIC));
- assertThatIllegalStateException()
- .isThrownBy(() -> manager.getTopicSource(CommInfrastructure.UEB, UEB_SINK_TOPIC));
}
@Test
@@ -336,38 +297,9 @@ public class TopicEndpointProxyTest {
manager.addTopicSinks(configuration);
assertSame(NOOP_SINK_TOPIC, manager.getTopicSink(CommInfrastructure.NOOP, NOOP_SINK_TOPIC).getTopic());
- assertSame(UEB_SINK_TOPIC, manager.getTopicSink(CommInfrastructure.UEB, UEB_SINK_TOPIC).getTopic());
assertThatIllegalStateException()
.isThrownBy(() -> manager.getTopicSink(CommInfrastructure.NOOP, NOOP_SOURCE_TOPIC));
- assertThatIllegalStateException()
- .isThrownBy(() -> manager.getTopicSink(CommInfrastructure.UEB, UEB_SOURCE_TOPIC));
- }
-
- @Test
- public void testGetUebTopicSource() {
- TopicEndpoint manager = new TopicEndpointProxy();
- manager.addTopicSources(configuration);
-
- assertSame(UEB_SOURCE_TOPIC, manager.getUebTopicSource(UEB_SOURCE_TOPIC).getTopic());
-
- assertThatIllegalStateException().isThrownBy(() -> manager.getUebTopicSource(NOOP_SOURCE_TOPIC));
-
- assertThatIllegalArgumentException().isThrownBy(() -> manager.getUebTopicSource(null));
- assertThatIllegalArgumentException().isThrownBy(() -> manager.getUebTopicSource(""));
- }
-
- @Test
- public void testGetUebTopicSink() {
- TopicEndpoint manager = new TopicEndpointProxy();
- manager.addTopicSinks(configuration);
-
- assertSame(UEB_SINK_TOPIC, manager.getUebTopicSink(UEB_SINK_TOPIC).getTopic());
-
- assertThatIllegalStateException().isThrownBy(() -> manager.getUebTopicSink(NOOP_SINK_TOPIC));
-
- assertThatIllegalArgumentException().isThrownBy(() -> manager.getUebTopicSink(null));
- assertThatIllegalArgumentException().isThrownBy(() -> manager.getUebTopicSink(""));
}
@Test
@@ -377,8 +309,6 @@ public class TopicEndpointProxyTest {
assertSame(NOOP_SOURCE_TOPIC, manager.getNoopTopicSource(NOOP_SOURCE_TOPIC).getTopic());
- assertThatIllegalStateException().isThrownBy(() -> manager.getNoopTopicSource(UEB_SOURCE_TOPIC));
-
assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSource(null));
assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSource(""));
}
@@ -390,8 +320,6 @@ public class TopicEndpointProxyTest {
assertSame(NOOP_SINK_TOPIC, manager.getNoopTopicSink(NOOP_SINK_TOPIC).getTopic());
- assertThatIllegalStateException().isThrownBy(() -> manager.getNoopTopicSink(UEB_SINK_TOPIC));
-
assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSink(null));
assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSink(""));
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactoryTestBase.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactoryTestBase.java
index 3986549c..3dfd96dd 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactoryTestBase.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactoryTestBase.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2022 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java
index c109e70a..52868c44 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2022 Nordix Foundation.
+ * Copyright (C) 2022, 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,11 +21,11 @@
package org.onap.policy.common.endpoints.event.comm.bus;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS;
import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
-import java.util.Arrays;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
@@ -70,7 +70,7 @@ public class KafkaTopicSinkFactoryTest extends KafkaTopicFactoryTestBase<KafkaTo
// check parameters that were used
BusTopicParams params = getLastParams();
- assertEquals(false, params.isAllowSelfSignedCerts());
+ assertFalse(params.isAllowSelfSignedCerts());
}
@Test
@@ -82,9 +82,9 @@ public class KafkaTopicSinkFactoryTest extends KafkaTopicFactoryTestBase<KafkaTo
assertEquals(MY_EFFECTIVE_TOPIC, topics.get(0).getEffectiveTopic());
BusTopicParams params = getLastParams();
- assertEquals(true, params.isManaged());
- assertEquals(false, params.isUseHttps());
- assertEquals(Arrays.asList(KAFKA_SERVER), params.getServers());
+ assertTrue(params.isManaged());
+ assertFalse(params.isUseHttps());
+ assertEquals(List.of(KAFKA_SERVER), params.getServers());
assertEquals(MY_TOPIC, params.getTopic());
assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic());
assertEquals(MY_PARTITION, params.getPartitionId());
@@ -178,7 +178,7 @@ public class KafkaTopicSinkFactoryTest extends KafkaTopicFactoryTestBase<KafkaTo
}
/**
- * Factory that records the parameters of all of the sinks it creates.
+ * Factory that records the parameters of all the sinks it creates.
*/
private static class SinkFactory extends IndexedKafkaTopicSinkFactory {
private Deque<BusTopicParams> params = new LinkedList<>();
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkTest.java
index 503e5131..483e4e99 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP Policy Engine - Common Modules
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2022, 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java
index 1c6985ad..392cefe9 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java
@@ -2,8 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP Policy Engine - Common Modules
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2023 Nordix Foundation.
+ * Copyright (C) 2022-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,6 +21,7 @@
package org.onap.policy.common.endpoints.event.comm.bus;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS;
@@ -69,9 +69,9 @@ public class KafkaTopicSourceFactoryTest extends KafkaTopicFactoryTestBase<Kafka
assertEquals(MY_EFFECTIVE_TOPIC, topics.get(0).getEffectiveTopic());
BusTopicParams params = getLastParams();
- assertEquals(true, params.isManaged());
- assertEquals(false, params.isUseHttps());
- assertEquals(Arrays.asList(KAFKA_SERVER), params.getServers());
+ assertTrue(params.isManaged());
+ assertFalse(params.isUseHttps());
+ assertEquals(List.of(KAFKA_SERVER), params.getServers());
assertEquals(MY_TOPIC, params.getTopic());
assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic());
}
@@ -154,7 +154,7 @@ public class KafkaTopicSourceFactoryTest extends KafkaTopicFactoryTestBase<Kafka
}
/**
- * Factory that records the parameters of all of the sources it creates.
+ * Factory that records the parameters of all the sources it creates.
*/
private static class SourceFactory extends IndexedKafkaTopicSourceFactory {
private Deque<BusTopicParams> params = new LinkedList<>();
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceTest.java
index ee2d1d7b..5079e601 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP Policy Engine - Common Modules
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2022 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactoryTestBase.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactoryTestBase.java
deleted file mode 100644
index 41dbac8c..00000000
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactoryTestBase.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
-
-import java.util.Collections;
-import org.onap.policy.common.endpoints.event.comm.Topic;
-
-/**
- * Base class for UebTopicXxxFactory tests.
- *
- * @param <T> type of topic managed by the factory
- */
-public abstract class UebTopicFactoryTestBase<T extends Topic> extends BusTopicFactoryTestBase<T> {
-
- @Override
- public void testBuildBusTopicParams_Ex() {
-
- super.testBuildBusTopicParams_Ex();
-
- // null servers
- assertThatIllegalArgumentException().as("null servers")
- .isThrownBy(() -> buildTopic(makeBuilder().servers(null).build()));
-
- // empty servers
- assertThatIllegalArgumentException().as("empty servers")
- .isThrownBy(() -> buildTopic(makeBuilder().servers(Collections.emptyList()).build()));
- }
-}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicPropertyBuilder.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicPropertyBuilder.java
deleted file mode 100644
index 42ea6eba..00000000
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicPropertyBuilder.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2024 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_AFT_ENV;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_API_KEY;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_API_SECRET;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_CONS_GROUP;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_CONS_INST;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_EFFECTIVE_TOPIC;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_FETCH_LIMIT;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_FETCH_TIMEOUT;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_PARTITION;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX;
-
-import java.util.List;
-import lombok.Getter;
-import org.onap.policy.common.endpoints.parameters.TopicParameters;
-
-@Getter
-public class UebTopicPropertyBuilder extends TopicPropertyBuilder {
-
- public static final String SERVER = "my-server";
- public static final String TOPIC2 = "my-topic-2";
-
- private final TopicParameters params = new TopicParameters();
-
- /**
- * Constructs the object.
- *
- * @param prefix the prefix for the properties to be built
- */
- public UebTopicPropertyBuilder(String prefix) {
- super(prefix);
- }
-
- /**
- * Adds a topic and configures its properties with default values.
- *
- * @param topic the topic to be added
- * @return this builder
- */
- public UebTopicPropertyBuilder makeTopic(String topic) {
- addTopic(topic);
-
- setTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, MY_EFFECTIVE_TOPIC);
- setTopicProperty(PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, MY_CONS_GROUP);
- setTopicProperty(PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, MY_CONS_INST);
- setTopicProperty(PROPERTY_MANAGED_SUFFIX, "true");
- setTopicProperty(PROPERTY_HTTP_HTTPS_SUFFIX, "true");
- setTopicProperty(PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX, "true");
- setTopicProperty(PROPERTY_TOPIC_API_KEY_SUFFIX, MY_API_KEY);
- setTopicProperty(PROPERTY_TOPIC_API_SECRET_SUFFIX, MY_API_SECRET);
- setTopicProperty(PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, MY_FETCH_LIMIT);
- setTopicProperty(PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX, MY_FETCH_TIMEOUT);
- setTopicProperty(PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, MY_PARTITION);
- setTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER);
-
- params.setTopicCommInfrastructure("ueb");
- params.setTopic(topic);
- params.setEffectiveTopic(MY_EFFECTIVE_TOPIC);
- params.setConsumerGroup(MY_CONS_GROUP);
- params.setConsumerInstance(MY_CONS_INST);
- params.setManaged(true);
- params.setUseHttps(true);
- params.setAftEnvironment(MY_AFT_ENV);
- params.setAllowSelfSignedCerts(true);
- params.setApiKey(MY_API_KEY);
- params.setApiSecret(MY_API_SECRET);
- params.setFetchLimit(MY_FETCH_LIMIT);
- params.setFetchTimeout(MY_FETCH_TIMEOUT);
- params.setPartitionId(MY_PARTITION);
- params.setServers(List.of(SERVER));
-
- return this;
- }
-}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactoryTest.java
deleted file mode 100644
index 4896a9df..00000000
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactoryTest.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS;
-
-import java.util.Deque;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-
-public class UebTopicSinkFactoryTest extends UebTopicFactoryTestBase<UebTopicSink> {
-
- private SinkFactory factory;
-
- /**
- * Creates the object to be tested.
- */
- @Before
- @Override
- public void setUp() {
- super.setUp();
-
- factory = new SinkFactory();
- }
-
- @After
- public void tearDown() {
- factory.destroy();
- }
-
- @Test
- @Override
- public void testBuildBusTopicParams() {
- super.testBuildBusTopicParams();
- super.testBuildBusTopicParams_Ex();
- }
-
- @Test
- @Override
- public void testBuildListOfStringString() {
- super.testBuildListOfStringString();
-
- // check parameters that were used
- BusTopicParams params = getLastParams();
- assertEquals(false, params.isAllowSelfSignedCerts());
- }
-
- @Test
- @Override
- public void testBuildProperties() {
- super.testBuildProperties();
- super.testBuildProperties_Variations();
- super.testBuildProperties_Multiple();
-
- initFactory();
-
- assertEquals(1, buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build()).size());
-
- BusTopicParams params = getLastParams();
- assertEquals(MY_PARTITION, params.getPartitionId());
- }
-
- @Test
- @Override
- public void testDestroyString_testGet_testInventory() {
- super.testDestroyString_testGet_testInventory();
- super.testDestroyString_Ex();
- }
-
- @Test
- @Override
- public void testDestroy() {
- super.testDestroy();
- }
-
- @Test
- public void testGet() {
- super.testGet_Ex();
- }
-
- @Test
- public void testToString() {
- assertTrue(factory.toString().startsWith("IndexedUebTopicSinkFactory ["));
- }
-
- @Override
- protected void initFactory() {
- if (factory != null) {
- factory.destroy();
- }
-
- factory = new SinkFactory();
- }
-
- @Override
- protected List<UebTopicSink> buildTopics(Properties properties) {
- return factory.build(properties);
- }
-
- @Override
- protected UebTopicSink buildTopic(BusTopicParams params) {
- return factory.build(params);
- }
-
- @Override
- protected UebTopicSink buildTopic(List<String> servers, String topic) {
- return factory.build(servers, topic);
- }
-
- @Override
- protected void destroyFactory() {
- factory.destroy();
- }
-
- @Override
- protected void destroyTopic(String topic) {
- factory.destroy(topic);
- }
-
- @Override
- protected List<UebTopicSink> getInventory() {
- return factory.inventory();
- }
-
- @Override
- protected UebTopicSink getTopic(String topic) {
- return factory.get(topic);
- }
-
- @Override
- protected BusTopicParams getLastParams() {
- return factory.params.getLast();
- }
-
- @Override
- protected TopicPropertyBuilder makePropBuilder() {
- return new UebTopicPropertyBuilder(PROPERTY_UEB_SINK_TOPICS);
- }
-
- /**
- * Factory that records the parameters of all of the sinks it creates.
- */
- private static class SinkFactory extends IndexedUebTopicSinkFactory {
- private Deque<BusTopicParams> params = new LinkedList<>();
-
- @Override
- protected UebTopicSink makeSink(BusTopicParams busTopicParams) {
- params.add(busTopicParams);
- return super.makeSink(busTopicParams);
- }
- }
-}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkTest.java
deleted file mode 100644
index 77452604..00000000
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP Policy Engine - Common Modules
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import static org.junit.Assert.assertNotNull;
-
-import org.junit.Test;
-
-public class UebTopicSinkTest {
-
- @Test
- public void test() {
- assertNotNull(UebTopicFactories.getSinkFactory());
- }
-
-}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactoryTest.java
deleted file mode 100644
index 81e30756..00000000
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactoryTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP Policy Engine - Common Modules
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS;
-
-import java.util.Deque;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-
-public class UebTopicSourceFactoryTest extends UebTopicFactoryTestBase<UebTopicSource> {
-
- private SourceFactory factory;
-
- /**
- * Creates the object to be tested.
- */
- @Before
- @Override
- public void setUp() {
- super.setUp();
-
- factory = new SourceFactory();
- }
-
- @After
- public void tearDown() {
- factory.destroy();
- }
-
- @Test
- @Override
- public void testBuildBusTopicParams() {
- super.testBuildBusTopicParams();
- super.testBuildBusTopicParams_Ex();
- }
-
- @Test
- @Override
- public void testBuildProperties() {
-
- super.testBuildProperties();
-
- // check source-specific parameters that were used
- BusTopicParams params = factory.params.getFirst();
- assertEquals(MY_CONS_GROUP, params.getConsumerGroup());
- assertEquals(MY_CONS_INST, params.getConsumerInstance());
- assertEquals(MY_FETCH_LIMIT, params.getFetchLimit());
- assertEquals(MY_FETCH_TIMEOUT, params.getFetchTimeout());
-
- super.testBuildProperties_Variations();
- super.testBuildProperties_Multiple();
-
- // check default values for source-specific parameters
- checkDefault(PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX,
- params2 -> params2.getFetchLimit() == PolicyEndPointProperties.DEFAULT_LIMIT_FETCH,
- null, "", "invalid-limit-number");
-
- checkDefault(PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX,
- params2 -> params2.getFetchTimeout() == PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH,
- null, "", "invalid-timeout-number");
- }
-
- @Test
- public void testBuildListOfStringStringStringString() {
- UebTopicSource source1 = factory.build(servers, MY_TOPIC, MY_API_KEY, MY_API_SECRET);
- assertNotNull(source1);
-
- // check source-specific parameters that were used
- BusTopicParams params = factory.params.getFirst();
- assertEquals(MY_API_KEY, params.getApiKey());
- assertEquals(MY_API_SECRET, params.getApiSecret());
- assertEquals(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH, params.getFetchLimit());
- assertEquals(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH, params.getFetchTimeout());
- }
-
- @Test
- @Override
- public void testBuildListOfStringString() {
- super.testBuildListOfStringString();
-
- // check source-specific parameters that were used
- BusTopicParams params = factory.params.getFirst();
- assertEquals(null, params.getApiKey());
- assertEquals(null, params.getApiSecret());
- assertEquals(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH, params.getFetchLimit());
- assertEquals(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH, params.getFetchTimeout());
-
- assertEquals(true, params.isAllowSelfSignedCerts());
- }
-
- @Test
- @Override
- public void testDestroyString_testGet_testInventory() {
- super.testDestroyString_testGet_testInventory();
- super.testDestroyString_Ex();
- }
-
- @Test
- @Override
- public void testDestroy() {
- super.testDestroy();
- }
-
- @Test
- public void testGet() {
- super.testGet_Ex();
- }
-
- @Test
- public void testToString() {
- assertTrue(factory.toString().startsWith("IndexedUebTopicSourceFactory ["));
- }
-
- @Override
- protected void initFactory() {
- if (factory != null) {
- factory.destroy();
- }
-
- factory = new SourceFactory();
- }
-
- @Override
- protected List<UebTopicSource> buildTopics(Properties properties) {
- return factory.build(properties);
- }
-
- @Override
- protected UebTopicSource buildTopic(BusTopicParams params) {
- return factory.build(params);
- }
-
- @Override
- protected UebTopicSource buildTopic(List<String> servers, String topic) {
- return factory.build(servers, topic);
- }
-
- @Override
- protected void destroyFactory() {
- factory.destroy();
- }
-
- @Override
- protected void destroyTopic(String topic) {
- factory.destroy(topic);
- }
-
- @Override
- protected List<UebTopicSource> getInventory() {
- return factory.inventory();
- }
-
- @Override
- protected UebTopicSource getTopic(String topic) {
- return factory.get(topic);
- }
-
- @Override
- protected BusTopicParams getLastParams() {
- return factory.params.getLast();
- }
-
- @Override
- protected TopicPropertyBuilder makePropBuilder() {
- return new UebTopicPropertyBuilder(PROPERTY_UEB_SOURCE_TOPICS);
- }
-
- /**
- * Factory that records the parameters of all of the sources it creates.
- */
- private static class SourceFactory extends IndexedUebTopicSourceFactory {
- private Deque<BusTopicParams> params = new LinkedList<>();
-
- @Override
- protected UebTopicSource makeSource(BusTopicParams busTopicParams) {
- params.add(busTopicParams);
- return super.makeSource(busTopicParams);
- }
- }
-}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceTest.java
deleted file mode 100644
index 9ef8af84..00000000
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP Policy Engine - Common Modules
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import static org.junit.Assert.assertNotNull;
-
-import org.junit.Test;
-
-public class UebTopicSourceTest {
-
- @Test
- public void test() {
- assertNotNull(UebTopicFactories.getSourceFactory());
- }
-
-}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java
index 70fa83c6..2c33a257 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java
@@ -23,42 +23,35 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import com.att.nsa.cambria.client.CambriaConsumer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
-import org.apache.commons.collections4.IteratorUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.CambriaConsumerWrapper;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-import org.springframework.test.util.ReflectionTestUtils;
public class BusConsumerTest extends TopicTestBase {
@@ -68,11 +61,18 @@ public class BusConsumerTest extends TopicTestBase {
@Mock
KafkaConsumer<String, String> mockedKafkaConsumer;
+ AutoCloseable closeable;
+
@Before
@Override
public void setUp() {
super.setUp();
- MockitoAnnotations.initMocks(this);
+ closeable = MockitoAnnotations.openMocks(this);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ closeable.close();
}
@@ -139,61 +139,6 @@ public class BusConsumerTest extends TopicTestBase {
}
@Test
- public void testCambriaConsumerWrapper() {
- // verify that different wrappers can be built
- new CambriaConsumerWrapper(makeBuilder().build());
- new CambriaConsumerWrapper(makeBuilder().useHttps(false).build());
- new CambriaConsumerWrapper(makeBuilder().useHttps(true).build());
- new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
- new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
- new CambriaConsumerWrapper(makeBuilder().apiKey(null).build());
- new CambriaConsumerWrapper(makeBuilder().apiSecret(null).build());
- new CambriaConsumerWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
- new CambriaConsumerWrapper(makeBuilder().userName(null).build());
- new CambriaConsumerWrapper(makeBuilder().password(null).build());
-
- assertThatCode(() -> new CambriaConsumerWrapper(makeBuilder().userName(null).password(null).build()))
- .doesNotThrowAnyException();
- }
-
- @Test
- public void testCambriaConsumerWrapperFetch() throws Exception {
- CambriaConsumer inner = mock(CambriaConsumer.class);
- List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
- when(inner.fetch()).thenReturn(lst);
-
- CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
- ReflectionTestUtils.setField(cons, "consumer", inner);
-
- assertEquals(lst, IteratorUtils.toList(cons.fetch().iterator()));
-
- // arrange to throw exception next time fetch is called
- IOException ex = new IOException(EXPECTED);
- when(inner.fetch()).thenThrow(ex);
-
- cons.fetchTimeout = 10;
-
- try {
- cons.fetch();
- fail("missing exception");
-
- } catch (IOException e) {
- assertEquals(ex, e);
- }
- }
-
- @Test
- public void testCambriaConsumerWrapperClose() {
- CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
- assertThatCode(cons::close).doesNotThrowAnyException();
- }
-
- @Test
- public void testCambriaConsumerWrapperToString() {
- assertNotNull(new CambriaConsumerWrapper(makeBuilder().build()).toString());
- }
-
- @Test
public void testKafkaConsumerWrapper() {
// verify that different wrappers can be built
assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException();
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java
deleted file mode 100644
index 66029b61..00000000
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2024 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal;
-
-import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import java.io.IOException;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher.CambriaPublisherWrapper;
-
-
-public class BusPublisherTest extends TopicTestBase {
-
- @Before
- @Override
- public void setUp() {
- super.setUp();
- }
-
- @Test
- public void testCambriaPublisherWrapper() {
- // verify that different wrappers can be built
- new CambriaPublisherWrapper(makeBuilder().build());
- new CambriaPublisherWrapper(makeBuilder().useHttps(false).build());
- new CambriaPublisherWrapper(makeBuilder().useHttps(true).build());
- new CambriaPublisherWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
- new CambriaPublisherWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
- new CambriaPublisherWrapper(makeBuilder().apiKey(null).build());
- new CambriaPublisherWrapper(makeBuilder().apiSecret(null).build());
- new CambriaPublisherWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
- new CambriaPublisherWrapper(makeBuilder().userName(null).build());
- new CambriaPublisherWrapper(makeBuilder().password(null).build());
- assertThatCode(() -> new CambriaPublisherWrapper(makeBuilder().userName(null).password(null).build()))
- .doesNotThrowAnyException();
- }
-
- @Test
- public void testCambriaPublisherWrapperSend() throws Exception {
- CambriaBatchingPublisher pub = mock(CambriaBatchingPublisher.class);
- CambriaPublisherWrapper cambria = new CambriaPublisherWrapper(makeBuilder().build());
- cambria.publisher = pub;
-
- assertTrue(cambria.send(MY_PARTITION, MY_MESSAGE));
-
- // publisher exception
- when(pub.send(anyString(), anyString())).thenThrow(new IOException(EXPECTED));
- assertFalse(cambria.send(MY_PARTITION2, MY_MESSAGE2));
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testCambriaPublisherWrapperSend_InvalidMsg() {
- CambriaPublisherWrapper cambria = new CambriaPublisherWrapper(makeBuilder().build());
- cambria.publisher = mock(CambriaBatchingPublisher.class);
-
- cambria.send(MY_PARTITION, null);
- }
-
- @Test
- public void testCambriaPublisherWrapperClose() {
- CambriaBatchingPublisher pub = mock(CambriaBatchingPublisher.class);
- CambriaPublisherWrapper cambria = new CambriaPublisherWrapper(makeBuilder().build());
- cambria.publisher = pub;
-
- cambria.close();
- verify(pub).close();
-
- // try again, this time with an exception
- doThrow(new RuntimeException(EXPECTED)).when(pub).close();
- cambria.close();
- }
-}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java
index 93e5067e..3abb8b10 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java
@@ -3,6 +3,7 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,8 +25,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import java.util.Arrays;
import java.util.LinkedList;
+import java.util.List;
import java.util.function.BiConsumer;
import org.junit.Before;
import org.junit.Test;
@@ -46,7 +47,7 @@ public class BusTopicParamsTest extends TopicTestBase {
assertEquals(addProps, params.getAdditionalProps());
assertEquals(MY_AFT_ENV, params.getAftEnvironment());
- assertEquals(true, params.isAllowSelfSignedCerts());
+ assertTrue(params.isAllowSelfSignedCerts());
assertEquals(MY_API_KEY, params.getApiKey());
assertEquals(MY_API_SECRET, params.getApiSecret());
assertEquals(MY_BASE_PATH, params.getBasePath());
@@ -59,7 +60,7 @@ public class BusTopicParamsTest extends TopicTestBase {
assertEquals(MY_HOST, params.getHostname());
assertEquals(MY_LAT, params.getLatitude());
assertEquals(MY_LONG, params.getLongitude());
- assertEquals(true, params.isManaged());
+ assertTrue(params.isManaged());
assertEquals(MY_PARTITION, params.getPartitionId());
assertEquals(MY_PARTNER, params.getPartner());
assertEquals(MY_PASS, params.getPassword());
@@ -67,16 +68,16 @@ public class BusTopicParamsTest extends TopicTestBase {
assertEquals(servers, params.getServers());
assertEquals(MY_TOPIC, params.getTopic());
assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic());
- assertEquals(true, params.isUseHttps());
+ assertTrue(params.isUseHttps());
assertEquals(MY_USERNAME, params.getUserName());
}
@Test
public void testBooleanGetters() {
// ensure that booleans are independent of each other
- testBoolean("true:false:false", (bldr, flag) -> bldr.allowSelfSignedCerts(flag));
- testBoolean("false:true:false", (bldr, flag) -> bldr.managed(flag));
- testBoolean("false:false:true", (bldr, flag) -> bldr.useHttps(flag));
+ testBoolean("true:false:false", TopicParamsBuilder::allowSelfSignedCerts);
+ testBoolean("false:true:false", TopicParamsBuilder::managed);
+ testBoolean("false:false:true", TopicParamsBuilder::useHttps);
}
@Test
@@ -124,15 +125,15 @@ public class BusTopicParamsTest extends TopicTestBase {
assertTrue(makeBuilder().port(65536).build().isPortInvalid());
assertTrue(makeBuilder().servers(null).build().isServersInvalid());
assertTrue(makeBuilder().servers(new LinkedList<>()).build().isServersInvalid());
- assertTrue(makeBuilder().servers(Arrays.asList("")).build().isServersInvalid());
- assertFalse(makeBuilder().servers(Arrays.asList("one-server")).build().isServersInvalid());
+ assertTrue(makeBuilder().servers(List.of("")).build().isServersInvalid());
+ assertFalse(makeBuilder().servers(List.of("one-server")).build().isServersInvalid());
assertTrue(makeBuilder().topic("").build().isTopicInvalid());
assertFalse(makeBuilder().userName("").build().isUserNameValid());
}
/**
* Tests the boolean methods by applying a function, once with {@code false} and once
- * with {@code true}. Verifies that all of the boolean methods return the correct
+ * with {@code true}. Verifies that all the boolean methods return the correct
* value by concatenating them.
*
* @param expectedTrue the string that is expected when {@code true} is passed to the
@@ -147,7 +148,7 @@ public class BusTopicParamsTest extends TopicTestBase {
BusTopicParams params = builder.build();
assertEquals("false:false:false",
- "" + params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps());
+ params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps());
// now try the "true" case
@@ -155,6 +156,6 @@ public class BusTopicParamsTest extends TopicTestBase {
params = builder.build();
assertEquals(expectedTrue,
- "" + params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps());
+ params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps());
}
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java
index e6eec799..7aa70b2a 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java
@@ -3,6 +3,7 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -32,6 +33,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
+import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -124,7 +126,7 @@ public class InlineBusTopicSinkTest extends TopicTestBase {
verify(pub).send(MY_PARTITION, MY_MESSAGE);
verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
- assertEquals(Arrays.asList(MY_MESSAGE), Arrays.asList(sink.getRecentEvents()));
+ assertEquals(List.of(MY_MESSAGE), Arrays.asList(sink.getRecentEvents()));
// arrange for send to throw an exception
when(pub.send(anyString(), anyString())).thenThrow(new RuntimeException(EXPECTED));
@@ -138,8 +140,7 @@ public class InlineBusTopicSinkTest extends TopicTestBase {
@Test(expected = IllegalArgumentException.class)
public void testSend_NullMessage() {
sink.start();
- BusPublisher pub = mock(BusPublisher.class);
- sink.publisher = pub;
+ sink.publisher = mock(BusPublisher.class);
sink.send(null);
}
@@ -147,16 +148,14 @@ public class InlineBusTopicSinkTest extends TopicTestBase {
@Test(expected = IllegalArgumentException.class)
public void testSend_EmptyMessage() {
sink.start();
- BusPublisher pub = mock(BusPublisher.class);
- sink.publisher = pub;
+ sink.publisher = mock(BusPublisher.class);
sink.send("");
}
@Test(expected = IllegalStateException.class)
public void testSend_NotStarted() {
- BusPublisher pub = mock(BusPublisher.class);
- sink.publisher = pub;
+ sink.publisher = mock(BusPublisher.class);
sink.send(MY_MESSAGE);
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.java
deleted file mode 100644
index 674f379f..00000000
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal;
-
-import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
-import org.onap.policy.common.utils.gson.GsonTestUtils;
-
-public class InlineUebTopicSinkTest extends TopicTestBase {
- private InlineUebTopicSink sink;
-
- /**
- * Creates the object to be tested.
- */
- @Before
- @Override
- public void setUp() {
- super.setUp();
-
- sink = new InlineUebTopicSink(makeBuilder().build());
- }
-
- @After
- public void tearDown() {
- sink.shutdown();
- }
-
- @Test
- public void testSerialize() {
- assertThatCode(() -> new GsonTestUtils().compareGson(sink, InlineUebTopicSinkTest.class))
- .doesNotThrowAnyException();
- }
-
- @Test
- public void testToString() {
- assertTrue(sink.toString().startsWith("InlineUebTopicSink ["));
- }
-
- @Test
- public void testInit() {
- assertThatCode(() -> sink.init()).doesNotThrowAnyException();
- }
-
- @Test
- public void testGetTopicCommInfrastructure() {
- assertEquals(CommInfrastructure.UEB, sink.getTopicCommInfrastructure());
- }
-
-}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.java
deleted file mode 100644
index 6536d0e8..00000000
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal;
-
-import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
-import org.onap.policy.common.utils.gson.GsonTestUtils;
-
-public class SingleThreadedUebTopicSourceTest extends TopicTestBase {
- private SingleThreadedUebTopicSource source;
-
- /**
- * Creates the object to be tested.
- */
- @Before
- @Override
- public void setUp() {
- super.setUp();
-
- source = new SingleThreadedUebTopicSource(makeBuilder().build());
- }
-
- @After
- public void tearDown() {
- source.shutdown();
- }
-
- @Test
- public void testSerialize() {
- assertThatCode(() -> new GsonTestUtils().compareGson(source, SingleThreadedUebTopicSourceTest.class))
- .doesNotThrowAnyException();
- }
-
- @Test
- public void testToString() {
- assertTrue(source.toString().startsWith("SingleThreadedUebTopicSource ["));
- source.shutdown();
- }
-
- @Test
- public void testGetTopicCommInfrastructure() {
- assertEquals(CommInfrastructure.UEB, source.getTopicCommInfrastructure());
- }
-
-}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java
index 2605c14b..704b2cb0 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java
@@ -3,6 +3,7 @@
* ONAP
* ================================================================================
* Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -68,7 +69,7 @@ public class BidirectionalTopicClientTest {
private static final String SOURCE_TOPIC = "my-source-topic";
private static final String MY_TEXT = "my-text";
- private static final CommInfrastructure SINK_INFRA = CommInfrastructure.UEB;
+ private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.NOOP;
@Mock
diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.json
index 48380426..5b2e712a 100644
--- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.json
+++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.json
@@ -1,55 +1,30 @@
{
- "locked" : false,
- "alive" : false,
- "topicSources" : [ {
- "servers" : [ "my-server" ],
- "topic" : "ueb-source",
- "effectiveTopic" : "my-effective-topic",
- "recentEvents" : [ ],
- "alive" : false,
- "locked" : false,
- "apiKey" : "my-api-key",
- "apiSecret" : "my-api-secret",
- "useHttps" : true,
- "allowTracing": false,
- "allowSelfSignedCerts" : true,
- "consumerGroup" : "${obj.topicSources[0].consumerGroup}",
- "consumerInstance" : "${obj.topicSources[0].consumerInstance}",
- "fetchTimeout" : 101,
- "fetchLimit" : 100,
- "topicCommInfrastructure" : "UEB"
- },
+ "locked": false,
+ "alive": false,
+ "topicSources": [
{
- "servers" : [ "my-server" ],
- "topic" : "noop-source",
- "effectiveTopic" : "noop-source",
- "recentEvents" : [ ],
- "alive" : false,
- "locked" : false,
- "topicCommInfrastructure" : "NOOP"
- } ],
- "topicSinks" : [ {
- "servers" : [ "my-server" ],
- "topic" : "ueb-sink",
- "effectiveTopic" : "my-effective-topic",
- "recentEvents" : [ ],
- "alive" : false,
- "locked" : false,
- "apiKey" : "my-api-key",
- "apiSecret" : "my-api-secret",
- "useHttps" : true,
- "allowTracing": false,
- "allowSelfSignedCerts" : true,
- "topicCommInfrastructure" : "UEB",
- "partitionKey" : "${obj.topicSinks[0].partitionKey}"
- },
+ "servers": [
+ "my-server"
+ ],
+ "topic": "noop-source",
+ "effectiveTopic": "noop-source",
+ "recentEvents": [],
+ "alive": false,
+ "locked": false,
+ "topicCommInfrastructure": "NOOP"
+ }
+ ],
+ "topicSinks": [
{
- "servers" : [ "my-server" ],
- "topic" : "noop-sink",
- "effectiveTopic" : "noop-sink",
- "recentEvents" : [ ],
- "alive" : false,
- "locked" : false,
- "topicCommInfrastructure" : "NOOP"
- } ]
+ "servers": [
+ "my-server"
+ ],
+ "topic": "noop-sink",
+ "effectiveTopic": "noop-sink",
+ "recentEvents": [],
+ "alive": false,
+ "locked": false,
+ "topicCommInfrastructure": "NOOP"
+ }
+ ]
}
diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.json
deleted file mode 100644
index 6dda9b9e..00000000
--- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.json
+++ /dev/null
@@ -1,15 +0,0 @@
-{
- "servers" : [ "svra", "svrb" ],
- "topic" : "my-topic",
- "effectiveTopic" : "my-effective-topic",
- "recentEvents" : [ ],
- "alive" : false,
- "locked" : false,
- "apiKey" : "my-api-key",
- "apiSecret" : "my-api-secret",
- "useHttps" : true,
- "allowTracing": true,
- "allowSelfSignedCerts" : true,
- "topicCommInfrastructure" : "UEB",
- "partitionKey" : "my-partition"
-}
diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.json
deleted file mode 100644
index 13ee6bc6..00000000
--- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.json
+++ /dev/null
@@ -1,18 +0,0 @@
-{
- "servers" : [ "svra", "svrb" ],
- "topic" : "my-topic",
- "effectiveTopic" : "my-effective-topic",
- "recentEvents" : [ ],
- "alive" : false,
- "locked" : false,
- "apiKey" : "my-api-key",
- "apiSecret" : "my-api-secret",
- "useHttps" : true,
- "allowTracing": true,
- "allowSelfSignedCerts" : true,
- "consumerGroup" : "my-cons-group",
- "consumerInstance" : "my-cons-inst",
- "fetchTimeout" : 101,
- "fetchLimit" : 100,
- "topicCommInfrastructure" : "UEB"
-}