diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java | 261 |
1 files changed, 82 insertions, 179 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java index 9aabad52..d9e55ddd 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java @@ -2,7 +2,8 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2022-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,21 +21,19 @@ package org.onap.policy.common.endpoints.event.comm; -import com.fasterxml.jackson.annotation.JsonIgnore; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Properties; +import lombok.Getter; import org.onap.policy.common.capabilities.Startable; -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicFactories; -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicFactories; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicFactories; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; import org.onap.policy.common.endpoints.parameters.TopicParameters; import org.onap.policy.common.gson.annotation.GsonJsonIgnore; @@ -45,6 +44,7 @@ import org.slf4j.LoggerFactory; * This implementation of the Topic Endpoint Manager, proxies operations to the appropriate * implementation(s). */ +@Getter class TopicEndpointProxy implements TopicEndpoint { /** * Logger. @@ -71,9 +71,9 @@ class TopicEndpointProxy implements TopicEndpoint { @Override public List<Topic> addTopics(TopicParameterGroup params) { List<TopicParameters> sinks = - (params.getTopicSinks() != null ? params.getTopicSinks() : Collections.emptyList()); + (params.getTopicSinks() != null ? params.getTopicSinks() : Collections.emptyList()); List<TopicParameters> sources = - (params.getTopicSources() != null ? params.getTopicSources() : Collections.emptyList()); + (params.getTopicSources() != null ? params.getTopicSources() : Collections.emptyList()); List<Topic> topics = new ArrayList<>(sinks.size() + sources.size()); topics.addAll(addTopicSources(sources)); @@ -87,18 +87,15 @@ class TopicEndpointProxy implements TopicEndpoint { for (TopicParameters param : paramList) { switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) { - case UEB: - sources.add(UebTopicFactories.getSourceFactory().build(param)); - break; - case DMAAP: - sources.add(DmaapTopicFactories.getSourceFactory().build(param)); + case KAFKA: + sources.add(KafkaTopicFactories.getSourceFactory().build(param)); break; case NOOP: sources.add(NoopTopicFactories.getSourceFactory().build(param)); break; default: logger.debug("Unknown source type {} for topic: {}", param.getTopicCommInfrastructure(), - param.getTopic()); + param.getTopic()); break; } } @@ -111,14 +108,12 @@ class TopicEndpointProxy implements TopicEndpoint { @Override public List<TopicSource> addTopicSources(Properties properties) { - // 1. Create UEB Sources - // 2. Create DMAAP Sources - // 3. Create NOOP Sources + // 1. Create KAFKA Sources + // 2. Create NOOP Sources List<TopicSource> sources = new ArrayList<>(); - sources.addAll(UebTopicFactories.getSourceFactory().build(properties)); - sources.addAll(DmaapTopicFactories.getSourceFactory().build(properties)); + sources.addAll(KafkaTopicFactories.getSourceFactory().build(properties)); sources.addAll(NoopTopicFactories.getSourceFactory().build(properties)); lockSources(sources); @@ -138,18 +133,15 @@ class TopicEndpointProxy implements TopicEndpoint { for (TopicParameters param : paramList) { switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) { - case UEB: - sinks.add(UebTopicFactories.getSinkFactory().build(param)); - break; - case DMAAP: - sinks.add(DmaapTopicFactories.getSinkFactory().build(param)); + case KAFKA: + sinks.add(KafkaTopicFactories.getSinkFactory().build(param)); break; case NOOP: sinks.add(NoopTopicFactories.getSinkFactory().build(param)); break; default: logger.debug("Unknown sink type {} for topic: {}", param.getTopicCommInfrastructure(), - param.getTopic()); + param.getTopic()); break; } } @@ -161,14 +153,12 @@ class TopicEndpointProxy implements TopicEndpoint { @Override public List<TopicSink> addTopicSinks(Properties properties) { - // 1. Create UEB Sinks - // 2. Create DMAAP Sinks - // 3. Create NOOP Sinks + // 1. Create KAFKA Sinks + // 2. Create NOOP Sinks final List<TopicSink> sinks = new ArrayList<>(); - sinks.addAll(UebTopicFactories.getSinkFactory().build(properties)); - sinks.addAll(DmaapTopicFactories.getSinkFactory().build(properties)); + sinks.addAll(KafkaTopicFactories.getSinkFactory().build(properties)); sinks.addAll(NoopTopicFactories.getSinkFactory().build(properties)); lockSinks(sinks); @@ -187,8 +177,7 @@ class TopicEndpointProxy implements TopicEndpoint { final List<TopicSource> sources = new ArrayList<>(); - sources.addAll(UebTopicFactories.getSourceFactory().inventory()); - sources.addAll(DmaapTopicFactories.getSourceFactory().inventory()); + sources.addAll(KafkaTopicFactories.getSourceFactory().inventory()); sources.addAll(NoopTopicFactories.getSourceFactory().inventory()); return sources; @@ -202,34 +191,21 @@ class TopicEndpointProxy implements TopicEndpoint { } final List<TopicSource> sources = new ArrayList<>(); - for (final String topic : topicNames) { - try { - final TopicSource uebSource = this.getUebTopicSource(topic); - if (uebSource != null) { - sources.add(uebSource); - } - } catch (final Exception e) { - logger.debug("No UEB source for topic: {}", topic, e); - } + topicNames.forEach(topic -> { try { - final TopicSource dmaapSource = this.getDmaapTopicSource(topic); - if (dmaapSource != null) { - sources.add(dmaapSource); - } + sources.add(Objects.requireNonNull(this.getKafkaTopicSource(topic))); } catch (final Exception e) { - logger.debug("No DMAAP source for topic: {}", topic, e); + logger.debug("No KAFKA source for topic: {}", topic, e); } try { - final TopicSource noopSource = this.getNoopTopicSource(topic); - if (noopSource != null) { - sources.add(noopSource); - } + sources.add(Objects.requireNonNull(this.getNoopTopicSource(topic))); } catch (final Exception e) { logger.debug("No NOOP source for topic: {}", topic, e); } - } + }); + return sources; } @@ -238,8 +214,7 @@ class TopicEndpointProxy implements TopicEndpoint { final List<TopicSink> sinks = new ArrayList<>(); - sinks.addAll(UebTopicFactories.getSinkFactory().inventory()); - sinks.addAll(DmaapTopicFactories.getSinkFactory().inventory()); + sinks.addAll(KafkaTopicFactories.getSinkFactory().inventory()); sinks.addAll(NoopTopicFactories.getSinkFactory().inventory()); return sinks; @@ -255,28 +230,13 @@ class TopicEndpointProxy implements TopicEndpoint { final List<TopicSink> sinks = new ArrayList<>(); for (final String topic : topicNames) { try { - final TopicSink uebSink = this.getUebTopicSink(topic); - if (uebSink != null) { - sinks.add(uebSink); - } + sinks.add(Objects.requireNonNull(this.getKafkaTopicSink(topic))); } catch (final Exception e) { - logger.debug("No UEB sink for topic: {}", topic, e); + logger.debug("No KAFKA sink for topic: {}", topic, e); } try { - final TopicSink dmaapSink = this.getDmaapTopicSink(topic); - if (dmaapSink != null) { - sinks.add(dmaapSink); - } - } catch (final Exception e) { - logger.debug("No DMAAP sink for topic: {}", topic, e); - } - - try { - final TopicSink noopSink = this.getNoopTopicSink(topic); - if (noopSink != null) { - sinks.add(noopSink); - } + sinks.add(Objects.requireNonNull(this.getNoopTopicSink(topic))); } catch (final Exception e) { logger.debug("No NOOP sink for topic: {}", topic, e); } @@ -287,19 +247,13 @@ class TopicEndpointProxy implements TopicEndpoint { @Override public List<TopicSink> getTopicSinks(String topicName) { if (topicName == null) { - throw parmException(null); + throw paramException(null); } final List<TopicSink> sinks = new ArrayList<>(); try { - sinks.add(this.getUebTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - try { - sinks.add(this.getDmaapTopicSink(topicName)); + sinks.add(this.getKafkaTopicSink(topicName)); } catch (final Exception e) { logNoSink(topicName, e); } @@ -313,42 +267,24 @@ class TopicEndpointProxy implements TopicEndpoint { return sinks; } - @JsonIgnore - @GsonJsonIgnore - @Override - public List<UebTopicSource> getUebTopicSources() { - return UebTopicFactories.getSourceFactory().inventory(); - } - - @JsonIgnore @GsonJsonIgnore @Override - public List<DmaapTopicSource> getDmaapTopicSources() { - return DmaapTopicFactories.getSourceFactory().inventory(); + public List<KafkaTopicSource> getKafkaTopicSources() { + return KafkaTopicFactories.getSourceFactory().inventory(); } - @JsonIgnore @GsonJsonIgnore @Override public List<NoopTopicSource> getNoopTopicSources() { return NoopTopicFactories.getSourceFactory().inventory(); } - @JsonIgnore - @GsonJsonIgnore @Override - public List<UebTopicSink> getUebTopicSinks() { - return UebTopicFactories.getSinkFactory().inventory(); - } - - @JsonIgnore @GsonJsonIgnore - @Override - public List<DmaapTopicSink> getDmaapTopicSinks() { - return DmaapTopicFactories.getSinkFactory().inventory(); + public List<KafkaTopicSink> getKafkaTopicSinks() { + return KafkaTopicFactories.getSinkFactory().inventory(); } - @JsonIgnore @GsonJsonIgnore @Override public List<NoopTopicSink> getNoopTopicSinks() { @@ -372,7 +308,7 @@ class TopicEndpointProxy implements TopicEndpoint { final List<Startable> endpoints = this.getEndpoints(); - boolean success = true; + var success = true; for (final Startable endpoint : endpoints) { try { success = endpoint.start() && success; @@ -398,7 +334,7 @@ class TopicEndpointProxy implements TopicEndpoint { final List<Startable> endpoints = this.getEndpoints(); - boolean success = true; + var success = true; for (final Startable endpoint : endpoints) { try { success = endpoint.stop() && success; @@ -416,7 +352,6 @@ class TopicEndpointProxy implements TopicEndpoint { * * @return list of managed endpoints */ - @JsonIgnore @GsonJsonIgnore protected List<Startable> getEndpoints() { final List<Startable> endpoints = new ArrayList<>(); @@ -431,11 +366,8 @@ class TopicEndpointProxy implements TopicEndpoint { public void shutdown() { this.stop(); - UebTopicFactories.getSourceFactory().destroy(); - UebTopicFactories.getSinkFactory().destroy(); - - DmaapTopicFactories.getSourceFactory().destroy(); - DmaapTopicFactories.getSinkFactory().destroy(); + KafkaTopicFactories.getSourceFactory().destroy(); + KafkaTopicFactories.getSinkFactory().destroy(); NoopTopicFactories.getSinkFactory().destroy(); NoopTopicFactories.getSourceFactory().destroy(); @@ -443,27 +375,22 @@ class TopicEndpointProxy implements TopicEndpoint { } @Override - public boolean isAlive() { - return this.alive; - } - - @Override public boolean lock() { + boolean shouldLock; synchronized (this) { - if (this.locked) { - return true; - } - + shouldLock = !this.locked; this.locked = true; } - for (final TopicSource source : this.getTopicSources()) { - source.lock(); - } + if (shouldLock) { + for (final TopicSource source : this.getTopicSources()) { + source.lock(); + } - for (final TopicSink sink : this.getTopicSinks()) { - sink.lock(); + for (final TopicSink sink : this.getTopicSinks()) { + sink.lock(); + } } return true; @@ -471,88 +398,64 @@ class TopicEndpointProxy implements TopicEndpoint { @Override public boolean unlock() { - synchronized (this) { - if (!this.locked) { - return true; - } + boolean shouldUnlock; + synchronized (this) { + shouldUnlock = this.locked; this.locked = false; } - for (final TopicSource source : this.getTopicSources()) { - source.unlock(); - } + if (shouldUnlock) { + for (final TopicSource source : this.getTopicSources()) { + source.unlock(); + } - for (final TopicSink sink : this.getTopicSinks()) { - sink.unlock(); + for (final TopicSink sink : this.getTopicSinks()) { + sink.unlock(); + } } return true; } @Override - public boolean isLocked() { - return this.locked; - } - - @Override public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) { if (commType == null) { - throw parmException(topicName); + throw paramException(topicName); } if (topicName == null) { - throw parmException(null); + throw paramException(null); } - switch (commType) { - case UEB: - return this.getUebTopicSource(topicName); - case DMAAP: - return this.getDmaapTopicSource(topicName); - case NOOP: - return this.getNoopTopicSource(topicName); - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } + return switch (commType) { + case KAFKA -> this.getKafkaTopicSource(topicName); + case NOOP -> this.getNoopTopicSource(topicName); + default -> throw new UnsupportedOperationException("Unsupported " + commType.name()); + }; } @Override public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) { if (commType == null) { - throw parmException(topicName); + throw paramException(topicName); } if (topicName == null) { - throw parmException(null); + throw paramException(null); } - switch (commType) { - case UEB: - return this.getUebTopicSink(topicName); - case DMAAP: - return this.getDmaapTopicSink(topicName); - case NOOP: - return this.getNoopTopicSink(topicName); - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } - } - - @Override - public UebTopicSource getUebTopicSource(String topicName) { - return UebTopicFactories.getSourceFactory().get(topicName); - } - - @Override - public UebTopicSink getUebTopicSink(String topicName) { - return UebTopicFactories.getSinkFactory().get(topicName); + return switch (commType) { + case KAFKA -> this.getKafkaTopicSink(topicName); + case NOOP -> this.getNoopTopicSink(topicName); + default -> throw new UnsupportedOperationException("Unsupported " + commType.name()); + }; } @Override - public DmaapTopicSource getDmaapTopicSource(String topicName) { - return DmaapTopicFactories.getSourceFactory().get(topicName); + public KafkaTopicSource getKafkaTopicSource(String topicName) { + return KafkaTopicFactories.getSourceFactory().get(topicName); } @Override @@ -561,8 +464,8 @@ class TopicEndpointProxy implements TopicEndpoint { } @Override - public DmaapTopicSink getDmaapTopicSink(String topicName) { - return DmaapTopicFactories.getSinkFactory().get(topicName); + public KafkaTopicSink getKafkaTopicSink(String topicName) { + return KafkaTopicFactories.getSinkFactory().get(topicName); } @Override @@ -570,7 +473,7 @@ class TopicEndpointProxy implements TopicEndpoint { return NoopTopicFactories.getSinkFactory().get(topicName); } - private IllegalArgumentException parmException(String topicName) { + private IllegalArgumentException paramException(String topicName) { return new IllegalArgumentException( "Invalid parameter: a communication infrastructure required to fetch " + topicName); } @@ -579,4 +482,4 @@ class TopicEndpointProxy implements TopicEndpoint { logger.debug("No sink for topic: {}", topicName, ex); } -}
\ No newline at end of file +} |