aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java261
1 files changed, 82 insertions, 179 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java
index 9aabad52..d9e55ddd 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java
@@ -2,7 +2,8 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2022-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,21 +21,19 @@
package org.onap.policy.common.endpoints.event.comm;
-import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.Properties;
+import lombok.Getter;
import org.onap.policy.common.capabilities.Startable;
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicFactories;
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink;
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
+import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicFactories;
+import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink;
+import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicFactories;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource;
import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
import org.onap.policy.common.endpoints.parameters.TopicParameters;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
@@ -45,6 +44,7 @@ import org.slf4j.LoggerFactory;
* This implementation of the Topic Endpoint Manager, proxies operations to the appropriate
* implementation(s).
*/
+@Getter
class TopicEndpointProxy implements TopicEndpoint {
/**
* Logger.
@@ -71,9 +71,9 @@ class TopicEndpointProxy implements TopicEndpoint {
@Override
public List<Topic> addTopics(TopicParameterGroup params) {
List<TopicParameters> sinks =
- (params.getTopicSinks() != null ? params.getTopicSinks() : Collections.emptyList());
+ (params.getTopicSinks() != null ? params.getTopicSinks() : Collections.emptyList());
List<TopicParameters> sources =
- (params.getTopicSources() != null ? params.getTopicSources() : Collections.emptyList());
+ (params.getTopicSources() != null ? params.getTopicSources() : Collections.emptyList());
List<Topic> topics = new ArrayList<>(sinks.size() + sources.size());
topics.addAll(addTopicSources(sources));
@@ -87,18 +87,15 @@ class TopicEndpointProxy implements TopicEndpoint {
for (TopicParameters param : paramList) {
switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) {
- case UEB:
- sources.add(UebTopicFactories.getSourceFactory().build(param));
- break;
- case DMAAP:
- sources.add(DmaapTopicFactories.getSourceFactory().build(param));
+ case KAFKA:
+ sources.add(KafkaTopicFactories.getSourceFactory().build(param));
break;
case NOOP:
sources.add(NoopTopicFactories.getSourceFactory().build(param));
break;
default:
logger.debug("Unknown source type {} for topic: {}", param.getTopicCommInfrastructure(),
- param.getTopic());
+ param.getTopic());
break;
}
}
@@ -111,14 +108,12 @@ class TopicEndpointProxy implements TopicEndpoint {
@Override
public List<TopicSource> addTopicSources(Properties properties) {
- // 1. Create UEB Sources
- // 2. Create DMAAP Sources
- // 3. Create NOOP Sources
+ // 1. Create KAFKA Sources
+ // 2. Create NOOP Sources
List<TopicSource> sources = new ArrayList<>();
- sources.addAll(UebTopicFactories.getSourceFactory().build(properties));
- sources.addAll(DmaapTopicFactories.getSourceFactory().build(properties));
+ sources.addAll(KafkaTopicFactories.getSourceFactory().build(properties));
sources.addAll(NoopTopicFactories.getSourceFactory().build(properties));
lockSources(sources);
@@ -138,18 +133,15 @@ class TopicEndpointProxy implements TopicEndpoint {
for (TopicParameters param : paramList) {
switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) {
- case UEB:
- sinks.add(UebTopicFactories.getSinkFactory().build(param));
- break;
- case DMAAP:
- sinks.add(DmaapTopicFactories.getSinkFactory().build(param));
+ case KAFKA:
+ sinks.add(KafkaTopicFactories.getSinkFactory().build(param));
break;
case NOOP:
sinks.add(NoopTopicFactories.getSinkFactory().build(param));
break;
default:
logger.debug("Unknown sink type {} for topic: {}", param.getTopicCommInfrastructure(),
- param.getTopic());
+ param.getTopic());
break;
}
}
@@ -161,14 +153,12 @@ class TopicEndpointProxy implements TopicEndpoint {
@Override
public List<TopicSink> addTopicSinks(Properties properties) {
- // 1. Create UEB Sinks
- // 2. Create DMAAP Sinks
- // 3. Create NOOP Sinks
+ // 1. Create KAFKA Sinks
+ // 2. Create NOOP Sinks
final List<TopicSink> sinks = new ArrayList<>();
- sinks.addAll(UebTopicFactories.getSinkFactory().build(properties));
- sinks.addAll(DmaapTopicFactories.getSinkFactory().build(properties));
+ sinks.addAll(KafkaTopicFactories.getSinkFactory().build(properties));
sinks.addAll(NoopTopicFactories.getSinkFactory().build(properties));
lockSinks(sinks);
@@ -187,8 +177,7 @@ class TopicEndpointProxy implements TopicEndpoint {
final List<TopicSource> sources = new ArrayList<>();
- sources.addAll(UebTopicFactories.getSourceFactory().inventory());
- sources.addAll(DmaapTopicFactories.getSourceFactory().inventory());
+ sources.addAll(KafkaTopicFactories.getSourceFactory().inventory());
sources.addAll(NoopTopicFactories.getSourceFactory().inventory());
return sources;
@@ -202,34 +191,21 @@ class TopicEndpointProxy implements TopicEndpoint {
}
final List<TopicSource> sources = new ArrayList<>();
- for (final String topic : topicNames) {
- try {
- final TopicSource uebSource = this.getUebTopicSource(topic);
- if (uebSource != null) {
- sources.add(uebSource);
- }
- } catch (final Exception e) {
- logger.debug("No UEB source for topic: {}", topic, e);
- }
+ topicNames.forEach(topic -> {
try {
- final TopicSource dmaapSource = this.getDmaapTopicSource(topic);
- if (dmaapSource != null) {
- sources.add(dmaapSource);
- }
+ sources.add(Objects.requireNonNull(this.getKafkaTopicSource(topic)));
} catch (final Exception e) {
- logger.debug("No DMAAP source for topic: {}", topic, e);
+ logger.debug("No KAFKA source for topic: {}", topic, e);
}
try {
- final TopicSource noopSource = this.getNoopTopicSource(topic);
- if (noopSource != null) {
- sources.add(noopSource);
- }
+ sources.add(Objects.requireNonNull(this.getNoopTopicSource(topic)));
} catch (final Exception e) {
logger.debug("No NOOP source for topic: {}", topic, e);
}
- }
+ });
+
return sources;
}
@@ -238,8 +214,7 @@ class TopicEndpointProxy implements TopicEndpoint {
final List<TopicSink> sinks = new ArrayList<>();
- sinks.addAll(UebTopicFactories.getSinkFactory().inventory());
- sinks.addAll(DmaapTopicFactories.getSinkFactory().inventory());
+ sinks.addAll(KafkaTopicFactories.getSinkFactory().inventory());
sinks.addAll(NoopTopicFactories.getSinkFactory().inventory());
return sinks;
@@ -255,28 +230,13 @@ class TopicEndpointProxy implements TopicEndpoint {
final List<TopicSink> sinks = new ArrayList<>();
for (final String topic : topicNames) {
try {
- final TopicSink uebSink = this.getUebTopicSink(topic);
- if (uebSink != null) {
- sinks.add(uebSink);
- }
+ sinks.add(Objects.requireNonNull(this.getKafkaTopicSink(topic)));
} catch (final Exception e) {
- logger.debug("No UEB sink for topic: {}", topic, e);
+ logger.debug("No KAFKA sink for topic: {}", topic, e);
}
try {
- final TopicSink dmaapSink = this.getDmaapTopicSink(topic);
- if (dmaapSink != null) {
- sinks.add(dmaapSink);
- }
- } catch (final Exception e) {
- logger.debug("No DMAAP sink for topic: {}", topic, e);
- }
-
- try {
- final TopicSink noopSink = this.getNoopTopicSink(topic);
- if (noopSink != null) {
- sinks.add(noopSink);
- }
+ sinks.add(Objects.requireNonNull(this.getNoopTopicSink(topic)));
} catch (final Exception e) {
logger.debug("No NOOP sink for topic: {}", topic, e);
}
@@ -287,19 +247,13 @@ class TopicEndpointProxy implements TopicEndpoint {
@Override
public List<TopicSink> getTopicSinks(String topicName) {
if (topicName == null) {
- throw parmException(null);
+ throw paramException(null);
}
final List<TopicSink> sinks = new ArrayList<>();
try {
- sinks.add(this.getUebTopicSink(topicName));
- } catch (final Exception e) {
- logNoSink(topicName, e);
- }
-
- try {
- sinks.add(this.getDmaapTopicSink(topicName));
+ sinks.add(this.getKafkaTopicSink(topicName));
} catch (final Exception e) {
logNoSink(topicName, e);
}
@@ -313,42 +267,24 @@ class TopicEndpointProxy implements TopicEndpoint {
return sinks;
}
- @JsonIgnore
- @GsonJsonIgnore
- @Override
- public List<UebTopicSource> getUebTopicSources() {
- return UebTopicFactories.getSourceFactory().inventory();
- }
-
- @JsonIgnore
@GsonJsonIgnore
@Override
- public List<DmaapTopicSource> getDmaapTopicSources() {
- return DmaapTopicFactories.getSourceFactory().inventory();
+ public List<KafkaTopicSource> getKafkaTopicSources() {
+ return KafkaTopicFactories.getSourceFactory().inventory();
}
- @JsonIgnore
@GsonJsonIgnore
@Override
public List<NoopTopicSource> getNoopTopicSources() {
return NoopTopicFactories.getSourceFactory().inventory();
}
- @JsonIgnore
- @GsonJsonIgnore
@Override
- public List<UebTopicSink> getUebTopicSinks() {
- return UebTopicFactories.getSinkFactory().inventory();
- }
-
- @JsonIgnore
@GsonJsonIgnore
- @Override
- public List<DmaapTopicSink> getDmaapTopicSinks() {
- return DmaapTopicFactories.getSinkFactory().inventory();
+ public List<KafkaTopicSink> getKafkaTopicSinks() {
+ return KafkaTopicFactories.getSinkFactory().inventory();
}
- @JsonIgnore
@GsonJsonIgnore
@Override
public List<NoopTopicSink> getNoopTopicSinks() {
@@ -372,7 +308,7 @@ class TopicEndpointProxy implements TopicEndpoint {
final List<Startable> endpoints = this.getEndpoints();
- boolean success = true;
+ var success = true;
for (final Startable endpoint : endpoints) {
try {
success = endpoint.start() && success;
@@ -398,7 +334,7 @@ class TopicEndpointProxy implements TopicEndpoint {
final List<Startable> endpoints = this.getEndpoints();
- boolean success = true;
+ var success = true;
for (final Startable endpoint : endpoints) {
try {
success = endpoint.stop() && success;
@@ -416,7 +352,6 @@ class TopicEndpointProxy implements TopicEndpoint {
*
* @return list of managed endpoints
*/
- @JsonIgnore
@GsonJsonIgnore
protected List<Startable> getEndpoints() {
final List<Startable> endpoints = new ArrayList<>();
@@ -431,11 +366,8 @@ class TopicEndpointProxy implements TopicEndpoint {
public void shutdown() {
this.stop();
- UebTopicFactories.getSourceFactory().destroy();
- UebTopicFactories.getSinkFactory().destroy();
-
- DmaapTopicFactories.getSourceFactory().destroy();
- DmaapTopicFactories.getSinkFactory().destroy();
+ KafkaTopicFactories.getSourceFactory().destroy();
+ KafkaTopicFactories.getSinkFactory().destroy();
NoopTopicFactories.getSinkFactory().destroy();
NoopTopicFactories.getSourceFactory().destroy();
@@ -443,27 +375,22 @@ class TopicEndpointProxy implements TopicEndpoint {
}
@Override
- public boolean isAlive() {
- return this.alive;
- }
-
- @Override
public boolean lock() {
+ boolean shouldLock;
synchronized (this) {
- if (this.locked) {
- return true;
- }
-
+ shouldLock = !this.locked;
this.locked = true;
}
- for (final TopicSource source : this.getTopicSources()) {
- source.lock();
- }
+ if (shouldLock) {
+ for (final TopicSource source : this.getTopicSources()) {
+ source.lock();
+ }
- for (final TopicSink sink : this.getTopicSinks()) {
- sink.lock();
+ for (final TopicSink sink : this.getTopicSinks()) {
+ sink.lock();
+ }
}
return true;
@@ -471,88 +398,64 @@ class TopicEndpointProxy implements TopicEndpoint {
@Override
public boolean unlock() {
- synchronized (this) {
- if (!this.locked) {
- return true;
- }
+ boolean shouldUnlock;
+ synchronized (this) {
+ shouldUnlock = this.locked;
this.locked = false;
}
- for (final TopicSource source : this.getTopicSources()) {
- source.unlock();
- }
+ if (shouldUnlock) {
+ for (final TopicSource source : this.getTopicSources()) {
+ source.unlock();
+ }
- for (final TopicSink sink : this.getTopicSinks()) {
- sink.unlock();
+ for (final TopicSink sink : this.getTopicSinks()) {
+ sink.unlock();
+ }
}
return true;
}
@Override
- public boolean isLocked() {
- return this.locked;
- }
-
- @Override
public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) {
if (commType == null) {
- throw parmException(topicName);
+ throw paramException(topicName);
}
if (topicName == null) {
- throw parmException(null);
+ throw paramException(null);
}
- switch (commType) {
- case UEB:
- return this.getUebTopicSource(topicName);
- case DMAAP:
- return this.getDmaapTopicSource(topicName);
- case NOOP:
- return this.getNoopTopicSource(topicName);
- default:
- throw new UnsupportedOperationException("Unsupported " + commType.name());
- }
+ return switch (commType) {
+ case KAFKA -> this.getKafkaTopicSource(topicName);
+ case NOOP -> this.getNoopTopicSource(topicName);
+ default -> throw new UnsupportedOperationException("Unsupported " + commType.name());
+ };
}
@Override
public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) {
if (commType == null) {
- throw parmException(topicName);
+ throw paramException(topicName);
}
if (topicName == null) {
- throw parmException(null);
+ throw paramException(null);
}
- switch (commType) {
- case UEB:
- return this.getUebTopicSink(topicName);
- case DMAAP:
- return this.getDmaapTopicSink(topicName);
- case NOOP:
- return this.getNoopTopicSink(topicName);
- default:
- throw new UnsupportedOperationException("Unsupported " + commType.name());
- }
- }
-
- @Override
- public UebTopicSource getUebTopicSource(String topicName) {
- return UebTopicFactories.getSourceFactory().get(topicName);
- }
-
- @Override
- public UebTopicSink getUebTopicSink(String topicName) {
- return UebTopicFactories.getSinkFactory().get(topicName);
+ return switch (commType) {
+ case KAFKA -> this.getKafkaTopicSink(topicName);
+ case NOOP -> this.getNoopTopicSink(topicName);
+ default -> throw new UnsupportedOperationException("Unsupported " + commType.name());
+ };
}
@Override
- public DmaapTopicSource getDmaapTopicSource(String topicName) {
- return DmaapTopicFactories.getSourceFactory().get(topicName);
+ public KafkaTopicSource getKafkaTopicSource(String topicName) {
+ return KafkaTopicFactories.getSourceFactory().get(topicName);
}
@Override
@@ -561,8 +464,8 @@ class TopicEndpointProxy implements TopicEndpoint {
}
@Override
- public DmaapTopicSink getDmaapTopicSink(String topicName) {
- return DmaapTopicFactories.getSinkFactory().get(topicName);
+ public KafkaTopicSink getKafkaTopicSink(String topicName) {
+ return KafkaTopicFactories.getSinkFactory().get(topicName);
}
@Override
@@ -570,7 +473,7 @@ class TopicEndpointProxy implements TopicEndpoint {
return NoopTopicFactories.getSinkFactory().get(topicName);
}
- private IllegalArgumentException parmException(String topicName) {
+ private IllegalArgumentException paramException(String topicName) {
return new IllegalArgumentException(
"Invalid parameter: a communication infrastructure required to fetch " + topicName);
}
@@ -579,4 +482,4 @@ class TopicEndpointProxy implements TopicEndpoint {
logger.debug("No sink for topic: {}", topicName, ex);
}
-} \ No newline at end of file
+}