aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java73
1 files changed, 71 insertions, 2 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java
index 293bf608..d37410e9 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java
@@ -3,6 +3,7 @@
* ONAP
* ================================================================================
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2022 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,6 +30,9 @@ import org.onap.policy.common.capabilities.Startable;
import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicFactories;
import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink;
import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
+import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicFactories;
+import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink;
+import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource;
@@ -95,6 +99,9 @@ class TopicEndpointProxy implements TopicEndpoint {
case DMAAP:
sources.add(DmaapTopicFactories.getSourceFactory().build(param));
break;
+ case KAFKA:
+ sources.add(KafkaTopicFactories.getSourceFactory().build(param));
+ break;
case NOOP:
sources.add(NoopTopicFactories.getSourceFactory().build(param));
break;
@@ -115,12 +122,14 @@ class TopicEndpointProxy implements TopicEndpoint {
// 1. Create UEB Sources
// 2. Create DMAAP Sources
- // 3. Create NOOP Sources
+ // 3. Create KAFKA Sources
+ // 4. Create NOOP Sources
List<TopicSource> sources = new ArrayList<>();
sources.addAll(UebTopicFactories.getSourceFactory().build(properties));
sources.addAll(DmaapTopicFactories.getSourceFactory().build(properties));
+ sources.addAll(KafkaTopicFactories.getSourceFactory().build(properties));
sources.addAll(NoopTopicFactories.getSourceFactory().build(properties));
lockSources(sources);
@@ -146,6 +155,9 @@ class TopicEndpointProxy implements TopicEndpoint {
case DMAAP:
sinks.add(DmaapTopicFactories.getSinkFactory().build(param));
break;
+ case KAFKA:
+ sinks.add(KafkaTopicFactories.getSinkFactory().build(param));
+ break;
case NOOP:
sinks.add(NoopTopicFactories.getSinkFactory().build(param));
break;
@@ -165,12 +177,14 @@ class TopicEndpointProxy implements TopicEndpoint {
public List<TopicSink> addTopicSinks(Properties properties) {
// 1. Create UEB Sinks
// 2. Create DMAAP Sinks
- // 3. Create NOOP Sinks
+ // 3. Create KAFKA Sinks
+ // 4. Create NOOP Sinks
final List<TopicSink> sinks = new ArrayList<>();
sinks.addAll(UebTopicFactories.getSinkFactory().build(properties));
sinks.addAll(DmaapTopicFactories.getSinkFactory().build(properties));
+ sinks.addAll(KafkaTopicFactories.getSinkFactory().build(properties));
sinks.addAll(NoopTopicFactories.getSinkFactory().build(properties));
lockSinks(sinks);
@@ -191,6 +205,7 @@ class TopicEndpointProxy implements TopicEndpoint {
sources.addAll(UebTopicFactories.getSourceFactory().inventory());
sources.addAll(DmaapTopicFactories.getSourceFactory().inventory());
+ sources.addAll(KafkaTopicFactories.getSourceFactory().inventory());
sources.addAll(NoopTopicFactories.getSourceFactory().inventory());
return sources;
@@ -224,6 +239,15 @@ class TopicEndpointProxy implements TopicEndpoint {
}
try {
+ final TopicSource kafkaSource = this.getKafkaTopicSource(topic);
+ if (kafkaSource != null) {
+ sources.add(kafkaSource);
+ }
+ } catch (final Exception e) {
+ logger.debug("No KAFKA source for topic: {}", topic, e);
+ }
+
+ try {
final TopicSource noopSource = this.getNoopTopicSource(topic);
if (noopSource != null) {
sources.add(noopSource);
@@ -242,6 +266,7 @@ class TopicEndpointProxy implements TopicEndpoint {
sinks.addAll(UebTopicFactories.getSinkFactory().inventory());
sinks.addAll(DmaapTopicFactories.getSinkFactory().inventory());
+ sinks.addAll(KafkaTopicFactories.getSinkFactory().inventory());
sinks.addAll(NoopTopicFactories.getSinkFactory().inventory());
return sinks;
@@ -275,6 +300,15 @@ class TopicEndpointProxy implements TopicEndpoint {
}
try {
+ final TopicSink kafkaSink = this.getKafkaTopicSink(topic);
+ if (kafkaSink != null) {
+ sinks.add(kafkaSink);
+ }
+ } catch (final Exception e) {
+ logger.debug("No KAFKA sink for topic: {}", topic, e);
+ }
+
+ try {
final TopicSink noopSink = this.getNoopTopicSink(topic);
if (noopSink != null) {
sinks.add(noopSink);
@@ -307,6 +341,12 @@ class TopicEndpointProxy implements TopicEndpoint {
}
try {
+ sinks.add(this.getKafkaTopicSink(topicName));
+ } catch (final Exception e) {
+ logNoSink(topicName, e);
+ }
+
+ try {
sinks.add(this.getNoopTopicSink(topicName));
} catch (final Exception e) {
logNoSink(topicName, e);
@@ -329,6 +369,12 @@ class TopicEndpointProxy implements TopicEndpoint {
@GsonJsonIgnore
@Override
+ public List<KafkaTopicSource> getKafkaTopicSources() {
+ return KafkaTopicFactories.getSourceFactory().inventory();
+ }
+
+ @GsonJsonIgnore
+ @Override
public List<NoopTopicSource> getNoopTopicSources() {
return NoopTopicFactories.getSourceFactory().inventory();
}
@@ -345,6 +391,12 @@ class TopicEndpointProxy implements TopicEndpoint {
return DmaapTopicFactories.getSinkFactory().inventory();
}
+ @Override
+ @GsonJsonIgnore
+ public List<KafkaTopicSink> getKafkaTopicSinks() {
+ return KafkaTopicFactories.getSinkFactory().inventory();
+ }
+
@GsonJsonIgnore
@Override
public List<NoopTopicSink> getNoopTopicSinks() {
@@ -432,6 +484,9 @@ class TopicEndpointProxy implements TopicEndpoint {
DmaapTopicFactories.getSourceFactory().destroy();
DmaapTopicFactories.getSinkFactory().destroy();
+ KafkaTopicFactories.getSourceFactory().destroy();
+ KafkaTopicFactories.getSinkFactory().destroy();
+
NoopTopicFactories.getSinkFactory().destroy();
NoopTopicFactories.getSourceFactory().destroy();
@@ -497,6 +552,8 @@ class TopicEndpointProxy implements TopicEndpoint {
return this.getUebTopicSource(topicName);
case DMAAP:
return this.getDmaapTopicSource(topicName);
+ case KAFKA:
+ return this.getKafkaTopicSource(topicName);
case NOOP:
return this.getNoopTopicSource(topicName);
default:
@@ -519,6 +576,8 @@ class TopicEndpointProxy implements TopicEndpoint {
return this.getUebTopicSink(topicName);
case DMAAP:
return this.getDmaapTopicSink(topicName);
+ case KAFKA:
+ return this.getKafkaTopicSink(topicName);
case NOOP:
return this.getNoopTopicSink(topicName);
default:
@@ -542,6 +601,11 @@ class TopicEndpointProxy implements TopicEndpoint {
}
@Override
+ public KafkaTopicSource getKafkaTopicSource(String topicName) {
+ return KafkaTopicFactories.getSourceFactory().get(topicName);
+ }
+
+ @Override
public NoopTopicSource getNoopTopicSource(String topicName) {
return NoopTopicFactories.getSourceFactory().get(topicName);
}
@@ -552,6 +616,11 @@ class TopicEndpointProxy implements TopicEndpoint {
}
@Override
+ public KafkaTopicSink getKafkaTopicSink(String topicName) {
+ return KafkaTopicFactories.getSinkFactory().get(topicName);
+ }
+
+ @Override
public NoopTopicSink getNoopTopicSink(String topicName) {
return NoopTopicFactories.getSinkFactory().get(topicName);
}