diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java | 73 |
1 files changed, 71 insertions, 2 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java index 293bf608..d37410e9 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java @@ -3,6 +3,7 @@ * ONAP * ================================================================================ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +30,9 @@ import org.onap.policy.common.capabilities.Startable; import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicFactories; import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicFactories; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource; @@ -95,6 +99,9 @@ class TopicEndpointProxy implements TopicEndpoint { case DMAAP: sources.add(DmaapTopicFactories.getSourceFactory().build(param)); break; + case KAFKA: + sources.add(KafkaTopicFactories.getSourceFactory().build(param)); + break; case NOOP: sources.add(NoopTopicFactories.getSourceFactory().build(param)); break; @@ -115,12 +122,14 @@ class TopicEndpointProxy implements TopicEndpoint { // 1. Create UEB Sources // 2. Create DMAAP Sources - // 3. Create NOOP Sources + // 3. Create KAFKA Sources + // 4. Create NOOP Sources List<TopicSource> sources = new ArrayList<>(); sources.addAll(UebTopicFactories.getSourceFactory().build(properties)); sources.addAll(DmaapTopicFactories.getSourceFactory().build(properties)); + sources.addAll(KafkaTopicFactories.getSourceFactory().build(properties)); sources.addAll(NoopTopicFactories.getSourceFactory().build(properties)); lockSources(sources); @@ -146,6 +155,9 @@ class TopicEndpointProxy implements TopicEndpoint { case DMAAP: sinks.add(DmaapTopicFactories.getSinkFactory().build(param)); break; + case KAFKA: + sinks.add(KafkaTopicFactories.getSinkFactory().build(param)); + break; case NOOP: sinks.add(NoopTopicFactories.getSinkFactory().build(param)); break; @@ -165,12 +177,14 @@ class TopicEndpointProxy implements TopicEndpoint { public List<TopicSink> addTopicSinks(Properties properties) { // 1. Create UEB Sinks // 2. Create DMAAP Sinks - // 3. Create NOOP Sinks + // 3. Create KAFKA Sinks + // 4. Create NOOP Sinks final List<TopicSink> sinks = new ArrayList<>(); sinks.addAll(UebTopicFactories.getSinkFactory().build(properties)); sinks.addAll(DmaapTopicFactories.getSinkFactory().build(properties)); + sinks.addAll(KafkaTopicFactories.getSinkFactory().build(properties)); sinks.addAll(NoopTopicFactories.getSinkFactory().build(properties)); lockSinks(sinks); @@ -191,6 +205,7 @@ class TopicEndpointProxy implements TopicEndpoint { sources.addAll(UebTopicFactories.getSourceFactory().inventory()); sources.addAll(DmaapTopicFactories.getSourceFactory().inventory()); + sources.addAll(KafkaTopicFactories.getSourceFactory().inventory()); sources.addAll(NoopTopicFactories.getSourceFactory().inventory()); return sources; @@ -224,6 +239,15 @@ class TopicEndpointProxy implements TopicEndpoint { } try { + final TopicSource kafkaSource = this.getKafkaTopicSource(topic); + if (kafkaSource != null) { + sources.add(kafkaSource); + } + } catch (final Exception e) { + logger.debug("No KAFKA source for topic: {}", topic, e); + } + + try { final TopicSource noopSource = this.getNoopTopicSource(topic); if (noopSource != null) { sources.add(noopSource); @@ -242,6 +266,7 @@ class TopicEndpointProxy implements TopicEndpoint { sinks.addAll(UebTopicFactories.getSinkFactory().inventory()); sinks.addAll(DmaapTopicFactories.getSinkFactory().inventory()); + sinks.addAll(KafkaTopicFactories.getSinkFactory().inventory()); sinks.addAll(NoopTopicFactories.getSinkFactory().inventory()); return sinks; @@ -275,6 +300,15 @@ class TopicEndpointProxy implements TopicEndpoint { } try { + final TopicSink kafkaSink = this.getKafkaTopicSink(topic); + if (kafkaSink != null) { + sinks.add(kafkaSink); + } + } catch (final Exception e) { + logger.debug("No KAFKA sink for topic: {}", topic, e); + } + + try { final TopicSink noopSink = this.getNoopTopicSink(topic); if (noopSink != null) { sinks.add(noopSink); @@ -307,6 +341,12 @@ class TopicEndpointProxy implements TopicEndpoint { } try { + sinks.add(this.getKafkaTopicSink(topicName)); + } catch (final Exception e) { + logNoSink(topicName, e); + } + + try { sinks.add(this.getNoopTopicSink(topicName)); } catch (final Exception e) { logNoSink(topicName, e); @@ -329,6 +369,12 @@ class TopicEndpointProxy implements TopicEndpoint { @GsonJsonIgnore @Override + public List<KafkaTopicSource> getKafkaTopicSources() { + return KafkaTopicFactories.getSourceFactory().inventory(); + } + + @GsonJsonIgnore + @Override public List<NoopTopicSource> getNoopTopicSources() { return NoopTopicFactories.getSourceFactory().inventory(); } @@ -345,6 +391,12 @@ class TopicEndpointProxy implements TopicEndpoint { return DmaapTopicFactories.getSinkFactory().inventory(); } + @Override + @GsonJsonIgnore + public List<KafkaTopicSink> getKafkaTopicSinks() { + return KafkaTopicFactories.getSinkFactory().inventory(); + } + @GsonJsonIgnore @Override public List<NoopTopicSink> getNoopTopicSinks() { @@ -432,6 +484,9 @@ class TopicEndpointProxy implements TopicEndpoint { DmaapTopicFactories.getSourceFactory().destroy(); DmaapTopicFactories.getSinkFactory().destroy(); + KafkaTopicFactories.getSourceFactory().destroy(); + KafkaTopicFactories.getSinkFactory().destroy(); + NoopTopicFactories.getSinkFactory().destroy(); NoopTopicFactories.getSourceFactory().destroy(); @@ -497,6 +552,8 @@ class TopicEndpointProxy implements TopicEndpoint { return this.getUebTopicSource(topicName); case DMAAP: return this.getDmaapTopicSource(topicName); + case KAFKA: + return this.getKafkaTopicSource(topicName); case NOOP: return this.getNoopTopicSource(topicName); default: @@ -519,6 +576,8 @@ class TopicEndpointProxy implements TopicEndpoint { return this.getUebTopicSink(topicName); case DMAAP: return this.getDmaapTopicSink(topicName); + case KAFKA: + return this.getKafkaTopicSink(topicName); case NOOP: return this.getNoopTopicSink(topicName); default: @@ -542,6 +601,11 @@ class TopicEndpointProxy implements TopicEndpoint { } @Override + public KafkaTopicSource getKafkaTopicSource(String topicName) { + return KafkaTopicFactories.getSourceFactory().get(topicName); + } + + @Override public NoopTopicSource getNoopTopicSource(String topicName) { return NoopTopicFactories.getSourceFactory().get(topicName); } @@ -552,6 +616,11 @@ class TopicEndpointProxy implements TopicEndpoint { } @Override + public KafkaTopicSink getKafkaTopicSink(String topicName) { + return KafkaTopicFactories.getSinkFactory().get(topicName); + } + + @Override public NoopTopicSink getNoopTopicSink(String topicName) { return NoopTopicFactories.getSinkFactory().get(topicName); } |