aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java110
1 files changed, 39 insertions, 71 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java
index d37410e9..5ba32b28 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java
@@ -3,7 +3,7 @@
* ONAP
* ================================================================================
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2022 Nordix Foundation.
+ * Modifications Copyright (C) 2022-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@ package org.onap.policy.common.endpoints.event.comm;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.Properties;
import lombok.Getter;
import org.onap.policy.common.capabilities.Startable;
@@ -49,6 +50,7 @@ import org.slf4j.LoggerFactory;
* This implementation of the Topic Endpoint Manager, proxies operations to the appropriate
* implementation(s).
*/
+@Getter
class TopicEndpointProxy implements TopicEndpoint {
/**
* Logger.
@@ -58,13 +60,11 @@ class TopicEndpointProxy implements TopicEndpoint {
/**
* Is this element locked boolean.
*/
- @Getter
private volatile boolean locked = false;
/**
* Is this element alive boolean.
*/
- @Getter
private volatile boolean alive = false;
@Override
@@ -77,9 +77,9 @@ class TopicEndpointProxy implements TopicEndpoint {
@Override
public List<Topic> addTopics(TopicParameterGroup params) {
List<TopicParameters> sinks =
- (params.getTopicSinks() != null ? params.getTopicSinks() : Collections.emptyList());
+ (params.getTopicSinks() != null ? params.getTopicSinks() : Collections.emptyList());
List<TopicParameters> sources =
- (params.getTopicSources() != null ? params.getTopicSources() : Collections.emptyList());
+ (params.getTopicSources() != null ? params.getTopicSources() : Collections.emptyList());
List<Topic> topics = new ArrayList<>(sinks.size() + sources.size());
topics.addAll(addTopicSources(sources));
@@ -107,7 +107,7 @@ class TopicEndpointProxy implements TopicEndpoint {
break;
default:
logger.debug("Unknown source type {} for topic: {}", param.getTopicCommInfrastructure(),
- param.getTopic());
+ param.getTopic());
break;
}
}
@@ -163,7 +163,7 @@ class TopicEndpointProxy implements TopicEndpoint {
break;
default:
logger.debug("Unknown sink type {} for topic: {}", param.getTopicCommInfrastructure(),
- param.getTopic());
+ param.getTopic());
break;
}
}
@@ -219,43 +219,33 @@ class TopicEndpointProxy implements TopicEndpoint {
}
final List<TopicSource> sources = new ArrayList<>();
- for (final String topic : topicNames) {
+
+ topicNames.forEach(topic -> {
try {
- final TopicSource uebSource = this.getUebTopicSource(topic);
- if (uebSource != null) {
- sources.add(uebSource);
- }
+ sources.add(Objects.requireNonNull(this.getUebTopicSource(topic)));
} catch (final Exception e) {
logger.debug("No UEB source for topic: {}", topic, e);
}
try {
- final TopicSource dmaapSource = this.getDmaapTopicSource(topic);
- if (dmaapSource != null) {
- sources.add(dmaapSource);
- }
+ sources.add(Objects.requireNonNull(this.getDmaapTopicSource(topic)));
} catch (final Exception e) {
logger.debug("No DMAAP source for topic: {}", topic, e);
}
try {
- final TopicSource kafkaSource = this.getKafkaTopicSource(topic);
- if (kafkaSource != null) {
- sources.add(kafkaSource);
- }
+ sources.add(Objects.requireNonNull(this.getKafkaTopicSource(topic)));
} catch (final Exception e) {
logger.debug("No KAFKA source for topic: {}", topic, e);
}
try {
- final TopicSource noopSource = this.getNoopTopicSource(topic);
- if (noopSource != null) {
- sources.add(noopSource);
- }
+ sources.add(Objects.requireNonNull(this.getNoopTopicSource(topic)));
} catch (final Exception e) {
logger.debug("No NOOP source for topic: {}", topic, e);
}
- }
+ });
+
return sources;
}
@@ -282,37 +272,25 @@ class TopicEndpointProxy implements TopicEndpoint {
final List<TopicSink> sinks = new ArrayList<>();
for (final String topic : topicNames) {
try {
- final TopicSink uebSink = this.getUebTopicSink(topic);
- if (uebSink != null) {
- sinks.add(uebSink);
- }
+ sinks.add(Objects.requireNonNull(this.getUebTopicSink(topic)));
} catch (final Exception e) {
logger.debug("No UEB sink for topic: {}", topic, e);
}
try {
- final TopicSink dmaapSink = this.getDmaapTopicSink(topic);
- if (dmaapSink != null) {
- sinks.add(dmaapSink);
- }
+ sinks.add(Objects.requireNonNull(this.getDmaapTopicSink(topic)));
} catch (final Exception e) {
logger.debug("No DMAAP sink for topic: {}", topic, e);
}
try {
- final TopicSink kafkaSink = this.getKafkaTopicSink(topic);
- if (kafkaSink != null) {
- sinks.add(kafkaSink);
- }
+ sinks.add(Objects.requireNonNull(this.getKafkaTopicSink(topic)));
} catch (final Exception e) {
logger.debug("No KAFKA sink for topic: {}", topic, e);
}
try {
- final TopicSink noopSink = this.getNoopTopicSink(topic);
- if (noopSink != null) {
- sinks.add(noopSink);
- }
+ sinks.add(Objects.requireNonNull(this.getNoopTopicSink(topic)));
} catch (final Exception e) {
logger.debug("No NOOP sink for topic: {}", topic, e);
}
@@ -323,7 +301,7 @@ class TopicEndpointProxy implements TopicEndpoint {
@Override
public List<TopicSink> getTopicSinks(String topicName) {
if (topicName == null) {
- throw parmException(null);
+ throw paramException(null);
}
final List<TopicSink> sinks = new ArrayList<>();
@@ -540,49 +518,39 @@ class TopicEndpointProxy implements TopicEndpoint {
public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) {
if (commType == null) {
- throw parmException(topicName);
+ throw paramException(topicName);
}
if (topicName == null) {
- throw parmException(null);
+ throw paramException(null);
}
- switch (commType) {
- case UEB:
- return this.getUebTopicSource(topicName);
- case DMAAP:
- return this.getDmaapTopicSource(topicName);
- case KAFKA:
- return this.getKafkaTopicSource(topicName);
- case NOOP:
- return this.getNoopTopicSource(topicName);
- default:
- throw new UnsupportedOperationException("Unsupported " + commType.name());
- }
+ return switch (commType) {
+ case UEB -> this.getUebTopicSource(topicName);
+ case DMAAP -> this.getDmaapTopicSource(topicName);
+ case KAFKA -> this.getKafkaTopicSource(topicName);
+ case NOOP -> this.getNoopTopicSource(topicName);
+ default -> throw new UnsupportedOperationException("Unsupported " + commType.name());
+ };
}
@Override
public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) {
if (commType == null) {
- throw parmException(topicName);
+ throw paramException(topicName);
}
if (topicName == null) {
- throw parmException(null);
+ throw paramException(null);
}
- switch (commType) {
- case UEB:
- return this.getUebTopicSink(topicName);
- case DMAAP:
- return this.getDmaapTopicSink(topicName);
- case KAFKA:
- return this.getKafkaTopicSink(topicName);
- case NOOP:
- return this.getNoopTopicSink(topicName);
- default:
- throw new UnsupportedOperationException("Unsupported " + commType.name());
- }
+ return switch (commType) {
+ case UEB -> this.getUebTopicSink(topicName);
+ case DMAAP -> this.getDmaapTopicSink(topicName);
+ case KAFKA -> this.getKafkaTopicSink(topicName);
+ case NOOP -> this.getNoopTopicSink(topicName);
+ default -> throw new UnsupportedOperationException("Unsupported " + commType.name());
+ };
}
@Override
@@ -625,7 +593,7 @@ class TopicEndpointProxy implements TopicEndpoint {
return NoopTopicFactories.getSinkFactory().get(topicName);
}
- private IllegalArgumentException parmException(String topicName) {
+ private IllegalArgumentException paramException(String topicName) {
return new IllegalArgumentException(
"Invalid parameter: a communication infrastructure required to fetch " + topicName);
}