diff options
author | Sirisha_Manchikanti <sirisha.manchikanti@est.tech> | 2022-07-01 07:15:00 +0100 |
---|---|---|
committer | Sirisha_Manchikanti <sirisha.manchikanti@est.tech> | 2022-07-22 20:17:46 +0100 |
commit | 2a2b5d085876480c1b0d9470a57c6cab4f51008c (patch) | |
tree | 64bd2ded017ad43bd995686e2e7b8f1ffa412a40 /policy-endpoints/src | |
parent | 15ede09d6ef4b52db05fab534eed7192991b1f98 (diff) |
Introduce Custom Kafka End point
Issue-ID: POLICY-4133
Signed-off-by: Sirisha_Manchikanti <sirisha.manchikanti@est.tech>
Change-Id: I2745f3af97e9bb83d94c5cb6d29dfd452d315506
Diffstat (limited to 'policy-endpoints/src')
25 files changed, 1532 insertions, 2 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java index 6532a198..69bee717 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java @@ -4,6 +4,7 @@ * ================================================================================ * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2019 Samsung Electronics Co., Ltd. + * Copyright (C) 2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,6 +45,10 @@ public interface Topic extends TopicRegisterable, Startable, Lockable { */ DMAAP, /** + * KAFKA Communication Infrastructure. + */ + KAFKA, + /** * NOOP for internal use only. */ NOOP, diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java index bb707523..9bf3731a 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java @@ -3,6 +3,7 @@ * ONAP * ================================================================================ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +27,8 @@ import org.onap.policy.common.capabilities.Lockable; import org.onap.policy.common.capabilities.Startable; import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; @@ -160,6 +163,14 @@ public interface TopicEndpoint extends Startable, Lockable { NoopTopicSource getNoopTopicSource(String topicName); /** + * Get the Kafka Source for the given topic name. + * + * @param topicName the topic name. + * @return the Kafka Source. + */ + KafkaTopicSource getKafkaTopicSource(String topicName); + + /** * Get the Topic Sinks for the given topic name. * * @param topicNames the topic names @@ -237,6 +248,18 @@ public interface TopicEndpoint extends Startable, Lockable { DmaapTopicSink getDmaapTopicSink(String topicName); /** + * Get the KAFKA Topic Source for the given topic name. + * + * @param topicName the topic name + * + * @return the Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + KafkaTopicSink getKafkaTopicSink(String topicName); + + /** * Gets only the UEB Topic Sources. * * @return the UEB Topic Source List @@ -251,6 +274,13 @@ public interface TopicEndpoint extends Startable, Lockable { List<DmaapTopicSource> getDmaapTopicSources(); /** + * Gets only the KAFKA Topic Sources. + * + * @return the KAFKA Topic Source List + */ + List<KafkaTopicSource> getKafkaTopicSources(); + + /** * Gets only the NOOP Topic Sources. * * @return the NOOP Topic Source List @@ -272,9 +302,17 @@ public interface TopicEndpoint extends Startable, Lockable { List<DmaapTopicSink> getDmaapTopicSinks(); /** + * Gets only the KAFKA Topic Sinks. + * + * @return the KAFKA Topic Sinks List + */ + List<KafkaTopicSink> getKafkaTopicSinks(); + + /** * Gets only the NOOP Topic Sinks. * * @return the NOOP Topic Sinks List */ List<NoopTopicSink> getNoopTopicSinks(); + } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java index 293bf608..d37410e9 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java @@ -3,6 +3,7 @@ * ONAP * ================================================================================ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +30,9 @@ import org.onap.policy.common.capabilities.Startable; import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicFactories; import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicFactories; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource; @@ -95,6 +99,9 @@ class TopicEndpointProxy implements TopicEndpoint { case DMAAP: sources.add(DmaapTopicFactories.getSourceFactory().build(param)); break; + case KAFKA: + sources.add(KafkaTopicFactories.getSourceFactory().build(param)); + break; case NOOP: sources.add(NoopTopicFactories.getSourceFactory().build(param)); break; @@ -115,12 +122,14 @@ class TopicEndpointProxy implements TopicEndpoint { // 1. Create UEB Sources // 2. Create DMAAP Sources - // 3. Create NOOP Sources + // 3. Create KAFKA Sources + // 4. Create NOOP Sources List<TopicSource> sources = new ArrayList<>(); sources.addAll(UebTopicFactories.getSourceFactory().build(properties)); sources.addAll(DmaapTopicFactories.getSourceFactory().build(properties)); + sources.addAll(KafkaTopicFactories.getSourceFactory().build(properties)); sources.addAll(NoopTopicFactories.getSourceFactory().build(properties)); lockSources(sources); @@ -146,6 +155,9 @@ class TopicEndpointProxy implements TopicEndpoint { case DMAAP: sinks.add(DmaapTopicFactories.getSinkFactory().build(param)); break; + case KAFKA: + sinks.add(KafkaTopicFactories.getSinkFactory().build(param)); + break; case NOOP: sinks.add(NoopTopicFactories.getSinkFactory().build(param)); break; @@ -165,12 +177,14 @@ class TopicEndpointProxy implements TopicEndpoint { public List<TopicSink> addTopicSinks(Properties properties) { // 1. Create UEB Sinks // 2. Create DMAAP Sinks - // 3. Create NOOP Sinks + // 3. Create KAFKA Sinks + // 4. Create NOOP Sinks final List<TopicSink> sinks = new ArrayList<>(); sinks.addAll(UebTopicFactories.getSinkFactory().build(properties)); sinks.addAll(DmaapTopicFactories.getSinkFactory().build(properties)); + sinks.addAll(KafkaTopicFactories.getSinkFactory().build(properties)); sinks.addAll(NoopTopicFactories.getSinkFactory().build(properties)); lockSinks(sinks); @@ -191,6 +205,7 @@ class TopicEndpointProxy implements TopicEndpoint { sources.addAll(UebTopicFactories.getSourceFactory().inventory()); sources.addAll(DmaapTopicFactories.getSourceFactory().inventory()); + sources.addAll(KafkaTopicFactories.getSourceFactory().inventory()); sources.addAll(NoopTopicFactories.getSourceFactory().inventory()); return sources; @@ -224,6 +239,15 @@ class TopicEndpointProxy implements TopicEndpoint { } try { + final TopicSource kafkaSource = this.getKafkaTopicSource(topic); + if (kafkaSource != null) { + sources.add(kafkaSource); + } + } catch (final Exception e) { + logger.debug("No KAFKA source for topic: {}", topic, e); + } + + try { final TopicSource noopSource = this.getNoopTopicSource(topic); if (noopSource != null) { sources.add(noopSource); @@ -242,6 +266,7 @@ class TopicEndpointProxy implements TopicEndpoint { sinks.addAll(UebTopicFactories.getSinkFactory().inventory()); sinks.addAll(DmaapTopicFactories.getSinkFactory().inventory()); + sinks.addAll(KafkaTopicFactories.getSinkFactory().inventory()); sinks.addAll(NoopTopicFactories.getSinkFactory().inventory()); return sinks; @@ -275,6 +300,15 @@ class TopicEndpointProxy implements TopicEndpoint { } try { + final TopicSink kafkaSink = this.getKafkaTopicSink(topic); + if (kafkaSink != null) { + sinks.add(kafkaSink); + } + } catch (final Exception e) { + logger.debug("No KAFKA sink for topic: {}", topic, e); + } + + try { final TopicSink noopSink = this.getNoopTopicSink(topic); if (noopSink != null) { sinks.add(noopSink); @@ -307,6 +341,12 @@ class TopicEndpointProxy implements TopicEndpoint { } try { + sinks.add(this.getKafkaTopicSink(topicName)); + } catch (final Exception e) { + logNoSink(topicName, e); + } + + try { sinks.add(this.getNoopTopicSink(topicName)); } catch (final Exception e) { logNoSink(topicName, e); @@ -329,6 +369,12 @@ class TopicEndpointProxy implements TopicEndpoint { @GsonJsonIgnore @Override + public List<KafkaTopicSource> getKafkaTopicSources() { + return KafkaTopicFactories.getSourceFactory().inventory(); + } + + @GsonJsonIgnore + @Override public List<NoopTopicSource> getNoopTopicSources() { return NoopTopicFactories.getSourceFactory().inventory(); } @@ -345,6 +391,12 @@ class TopicEndpointProxy implements TopicEndpoint { return DmaapTopicFactories.getSinkFactory().inventory(); } + @Override + @GsonJsonIgnore + public List<KafkaTopicSink> getKafkaTopicSinks() { + return KafkaTopicFactories.getSinkFactory().inventory(); + } + @GsonJsonIgnore @Override public List<NoopTopicSink> getNoopTopicSinks() { @@ -432,6 +484,9 @@ class TopicEndpointProxy implements TopicEndpoint { DmaapTopicFactories.getSourceFactory().destroy(); DmaapTopicFactories.getSinkFactory().destroy(); + KafkaTopicFactories.getSourceFactory().destroy(); + KafkaTopicFactories.getSinkFactory().destroy(); + NoopTopicFactories.getSinkFactory().destroy(); NoopTopicFactories.getSourceFactory().destroy(); @@ -497,6 +552,8 @@ class TopicEndpointProxy implements TopicEndpoint { return this.getUebTopicSource(topicName); case DMAAP: return this.getDmaapTopicSource(topicName); + case KAFKA: + return this.getKafkaTopicSource(topicName); case NOOP: return this.getNoopTopicSource(topicName); default: @@ -519,6 +576,8 @@ class TopicEndpointProxy implements TopicEndpoint { return this.getUebTopicSink(topicName); case DMAAP: return this.getDmaapTopicSink(topicName); + case KAFKA: + return this.getKafkaTopicSink(topicName); case NOOP: return this.getNoopTopicSink(topicName); default: @@ -542,6 +601,11 @@ class TopicEndpointProxy implements TopicEndpoint { } @Override + public KafkaTopicSource getKafkaTopicSource(String topicName) { + return KafkaTopicFactories.getSourceFactory().get(topicName); + } + + @Override public NoopTopicSource getNoopTopicSource(String topicName) { return NoopTopicFactories.getSourceFactory().get(topicName); } @@ -552,6 +616,11 @@ class TopicEndpointProxy implements TopicEndpoint { } @Override + public KafkaTopicSink getKafkaTopicSink(String topicName) { + return KafkaTopicFactories.getSinkFactory().get(topicName); + } + + @Override public NoopTopicSink getNoopTopicSink(String topicName) { return NoopTopicFactories.getSinkFactory().get(topicName); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java new file mode 100644 index 00000000..23aaabd4 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java @@ -0,0 +1,198 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import com.google.re2j.Pattern; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; +import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineKafkaTopicSink; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.onap.policy.common.endpoints.utils.KafkaPropertyUtils; +import org.onap.policy.common.endpoints.utils.PropertyUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory of KAFKA Reader Topics indexed by topic name. + */ +class IndexedKafkaTopicSinkFactory implements KafkaTopicSinkFactory { + private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*"); + private static final String MISSING_TOPIC = "A topic must be provided"; + + /** + * Logger. + */ + private static Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSinkFactory.class); + + /** + * KAFKA Topic Name Index. + */ + protected HashMap<String, KafkaTopicSink> kafkaTopicSinks = new HashMap<>(); + + @Override + public KafkaTopicSink build(BusTopicParams busTopicParams) { + + if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) { + throw new IllegalArgumentException("KAFKA Server(s) must be provided"); + } + + if (StringUtils.isBlank(busTopicParams.getTopic())) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (kafkaTopicSinks.containsKey(busTopicParams.getTopic())) { + return kafkaTopicSinks.get(busTopicParams.getTopic()); + } + + KafkaTopicSink kafkaTopicWriter = makeSink(busTopicParams); + if (busTopicParams.isManaged()) { + kafkaTopicSinks.put(busTopicParams.getTopic(), kafkaTopicWriter); + } + + return kafkaTopicWriter; + } + } + + + @Override + public KafkaTopicSink build(List<String> servers, String topic) { + return this.build(BusTopicParams.builder() + .servers(servers) + .topic(topic) + .managed(true) + .useHttps(false) + .build()); + } + + + @Override + public List<KafkaTopicSink> build(Properties properties) { + + String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS); + if (StringUtils.isBlank(writeTopics)) { + logger.info("{}: no topic for KAFKA Sink", this); + return new ArrayList<>(); + } + + List<KafkaTopicSink> newKafkaTopicSinks = new ArrayList<>(); + synchronized (this) { + for (String topic : COMMA_SPACE_PAT.split(writeTopics)) { + addTopic(newKafkaTopicSinks, topic, properties); + } + return newKafkaTopicSinks; + } + } + + private void addTopic(List<KafkaTopicSink> newKafkaTopicSinks, String topic, Properties properties) { + if (this.kafkaTopicSinks.containsKey(topic)) { + newKafkaTopicSinks.add(this.kafkaTopicSinks.get(topic)); + return; + } + + String topicPrefix = PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic; + + var props = new PropertyUtils(properties, topicPrefix, + (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); + + String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + if (StringUtils.isBlank(servers)) { + logger.error("{}: no KAFKA servers configured for sink {}", this, topic); + return; + } + + KafkaTopicSink kafkaTopicWriter = this.build(KafkaPropertyUtils.makeBuilder(props, topic, servers) + .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null)) + .build()); + newKafkaTopicSinks.add(kafkaTopicWriter); + } + + @Override + public void destroy(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + KafkaTopicSink kafkaTopicWriter; + synchronized (this) { + if (!kafkaTopicSinks.containsKey(topic)) { + return; + } + + kafkaTopicWriter = kafkaTopicSinks.remove(topic); + } + + kafkaTopicWriter.shutdown(); + } + + @Override + public void destroy() { + List<KafkaTopicSink> writers = this.inventory(); + for (KafkaTopicSink writer : writers) { + writer.shutdown(); + } + + synchronized (this) { + this.kafkaTopicSinks.clear(); + } + } + + @Override + public KafkaTopicSink get(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (kafkaTopicSinks.containsKey(topic)) { + return kafkaTopicSinks.get(topic); + } else { + throw new IllegalStateException("KafkaTopicSink for " + topic + " not found"); + } + } + } + + @Override + public synchronized List<KafkaTopicSink> inventory() { + return new ArrayList<>(this.kafkaTopicSinks.values()); + } + + /** + * Makes a new sink. + * + * @param busTopicParams parameters to use to configure the sink + * @return a new sink + */ + protected KafkaTopicSink makeSink(BusTopicParams busTopicParams) { + return new InlineKafkaTopicSink(busTopicParams); + } + + + @Override + public String toString() { + return "IndexedKafkaTopicSinkFactory " + kafkaTopicSinks.keySet(); + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java new file mode 100644 index 00000000..47279d47 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java @@ -0,0 +1,191 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import com.google.re2j.Pattern; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; +import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedKafkaTopicSource; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.onap.policy.common.endpoints.utils.KafkaPropertyUtils; +import org.onap.policy.common.endpoints.utils.PropertyUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory of KAFKA Source Topics indexed by topic name. + */ +class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory { + private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*"); + private static final String MISSING_TOPIC = "A topic must be provided"; + + /** + * Logger. + */ + private static Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSourceFactory.class); + + /** + * KAFKA Topic Name Index. + */ + protected HashMap<String, KafkaTopicSource> kafkaTopicSources = new HashMap<>(); + + @Override + public KafkaTopicSource build(BusTopicParams busTopicParams) { + if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) { + throw new IllegalArgumentException("KAFKA Server(s) must be provided"); + } + + if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (kafkaTopicSources.containsKey(busTopicParams.getTopic())) { + return kafkaTopicSources.get(busTopicParams.getTopic()); + } + + var kafkaTopicSource = makeSource(busTopicParams); + kafkaTopicSources.put(busTopicParams.getTopic(), kafkaTopicSource); + + return kafkaTopicSource; + } + } + + @Override + public List<KafkaTopicSource> build(Properties properties) { + + String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS); + if (StringUtils.isBlank(readTopics)) { + logger.info("{}: no topic for KAFKA Source", this); + return new ArrayList<>(); + } + + List<KafkaTopicSource> newKafkaTopicSources = new ArrayList<>(); + synchronized (this) { + for (String topic : COMMA_SPACE_PAT.split(readTopics)) { + addTopic(newKafkaTopicSources, topic, properties); + } + } + return newKafkaTopicSources; + } + + @Override + public KafkaTopicSource build(List<String> servers, String topic) { + return this.build(BusTopicParams.builder() + .servers(servers) + .topic(topic) + .managed(true) + .useHttps(false).build()); + } + + private void addTopic(List<KafkaTopicSource> newKafkaTopicSources, String topic, Properties properties) { + if (this.kafkaTopicSources.containsKey(topic)) { + newKafkaTopicSources.add(this.kafkaTopicSources.get(topic)); + return; + } + + String topicPrefix = PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic; + + var props = new PropertyUtils(properties, topicPrefix, + (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); + + String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + if (StringUtils.isBlank(servers)) { + logger.error("{}: no KAFKA servers configured for sink {}", this, topic); + return; + } + + var kafkaTopicSource = this.build(KafkaPropertyUtils.makeBuilder(props, topic, servers) + .build()); + + newKafkaTopicSources.add(kafkaTopicSource); + } + + /** + * Makes a new source. + * + * @param busTopicParams parameters to use to configure the source + * @return a new source + */ + protected KafkaTopicSource makeSource(BusTopicParams busTopicParams) { + return new SingleThreadedKafkaTopicSource(busTopicParams); + } + + @Override + public void destroy(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + KafkaTopicSource kafkaTopicSource; + + synchronized (this) { + if (!kafkaTopicSources.containsKey(topic)) { + return; + } + + kafkaTopicSource = kafkaTopicSources.remove(topic); + } + + kafkaTopicSource.shutdown(); + } + + @Override + public void destroy() { + List<KafkaTopicSource> readers = this.inventory(); + for (KafkaTopicSource reader : readers) { + reader.shutdown(); + } + + synchronized (this) { + this.kafkaTopicSources.clear(); + } + } + + @Override + public KafkaTopicSource get(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (kafkaTopicSources.containsKey(topic)) { + return kafkaTopicSources.get(topic); + } else { + throw new IllegalStateException("KafkaTopiceSource for " + topic + " not found"); + } + } + } + + @Override + public synchronized List<KafkaTopicSource> inventory() { + return new ArrayList<>(this.kafkaTopicSources.values()); + } + + @Override + public String toString() { + return "IndexedKafkaTopicSourceFactory " + kafkaTopicSources.keySet(); + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactories.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactories.java new file mode 100644 index 00000000..60db3857 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactories.java @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import lombok.AccessLevel; +import lombok.Getter; +import lombok.NoArgsConstructor; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class KafkaTopicFactories { + + /** + * Factory for instantiation and management of sinks. + */ + @Getter + private static final KafkaTopicSinkFactory sinkFactory = new IndexedKafkaTopicSinkFactory(); + + /** + * Factory for instantiation and management of sources. + */ + @Getter + private static final KafkaTopicSourceFactory sourceFactory = new IndexedKafkaTopicSourceFactory(); +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSink.java new file mode 100644 index 00000000..960a02c5 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSink.java @@ -0,0 +1,26 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +/** + * Topic Writer over KAFKA Infrastructure. + */ +public interface KafkaTopicSink extends BusTopicSink { + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactory.java new file mode 100644 index 00000000..fa5e56f9 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactory.java @@ -0,0 +1,89 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.List; +import java.util.Properties; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; + +/** + * KAFKA Topic Sink Factory. + */ +public interface KafkaTopicSinkFactory { + + /** + * Instantiates a new KAFKA Topic Writer. + * + * @param busTopicParams parameters object + * @return an KAFKA Topic Sink + */ + KafkaTopicSink build(BusTopicParams busTopicParams); + + /** + * Creates an KAFKA Topic Writer based on properties files. + * + * @param properties Properties containing initialization values + * + * @return an KAFKA Topic Writer + * @throws IllegalArgumentException if invalid parameters are present + */ + List<KafkaTopicSink> build(Properties properties); + + /** + * Instantiates a new KAFKA Topic Writer. + * + * @param servers list of servers + * @param topic topic name + * + * @return an KAFKA Topic Writer + * @throws IllegalArgumentException if invalid parameters are present + */ + KafkaTopicSink build(List<String> servers, String topic); + + /** + * Destroys an KAFKA Topic Writer based on a topic. + * + * @param topic topic name + * @throws IllegalArgumentException if invalid parameters are present + */ + void destroy(String topic); + + /** + * Destroys all KAFKA Topic Writers. + */ + void destroy(); + + /** + * gets an KAFKA Topic Writer based on topic name. + * + * @param topic the topic name + * + * @return an KAFKA Topic Writer with topic name + * @throws IllegalArgumentException if an invalid topic is provided + * @throws IllegalStateException if the KAFKA Topic Reader is an incorrect state + */ + KafkaTopicSink get(String topic); + + /** + * Provides a snapshot of the KAFKA Topic Writers. + * + * @return a list of the KAFKA Topic Writers + */ + List<KafkaTopicSink> inventory(); +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSource.java new file mode 100644 index 00000000..03efd083 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSource.java @@ -0,0 +1,26 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +/** + * Kafka Topic Source. + */ +public interface KafkaTopicSource extends BusTopicSource { + +}
\ No newline at end of file diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java new file mode 100644 index 00000000..8cb51df8 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java @@ -0,0 +1,88 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.List; +import java.util.Properties; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; + +/** + * Kafka Topic Source Factory. + */ +public interface KafkaTopicSourceFactory { + + /** + * Creates an Kafka Topic Source based on properties files. + * + * @param properties Properties containing initialization values + * + * @return an Kafka Topic Source + * @throws IllegalArgumentException if invalid parameters are present + */ + List<KafkaTopicSource> build(Properties properties); + + /** + * Instantiates a new Kafka Topic Source. + * + * @param busTopicParams parameters object + * @return an Kafka Topic Source + */ + KafkaTopicSource build(BusTopicParams busTopicParams); + + /** + * Instantiates a new Kafka Topic Source. + * + * @param servers list of servers + * @param topic topic name + * + * @return an Kafka Topic Source + * @throws IllegalArgumentException if invalid parameters are present + */ + KafkaTopicSource build(List<String> servers, String topic); + + /** + * Destroys an Kafka Topic Source based on a topic. + * + * @param topic topic name + * @throws IllegalArgumentException if invalid parameters are present + */ + void destroy(String topic); + + /** + * Destroys all Kafka Topic Sources. + */ + void destroy(); + + /** + * Gets an Kafka Topic Source based on topic name. + * + * @param topic the topic name + * @return an Kafka Topic Source with topic name + * @throws IllegalArgumentException if an invalid topic is provided + * @throws IllegalStateException if the Kafka Topic Source is an incorrect state + */ + KafkaTopicSource get(String topic); + + /** + * Provides a snapshot of the Kafka Topic Sources. + * + * @return a list of the Kafka Topic Sources + */ + List<KafkaTopicSource> inventory(); +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 20f4c91c..8d88b0d9 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -5,6 +5,7 @@ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Copyright (C) 2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,13 +29,19 @@ import com.att.nsa.cambria.client.CambriaConsumer; import java.io.IOException; import java.net.MalformedURLException; import java.security.GeneralSecurityException; +import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import lombok.Getter; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.impl.MRConsumerImpl; import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder; @@ -219,6 +226,53 @@ public interface BusConsumer { } /** + * Kafka based consumer. + */ + public static class KafkaConsumerWrapper extends FetchingBusConsumer { + + /** + * logger. + */ + private static Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class); + + /** + * Kafka consumer. + */ + private KafkaConsumer<String, String> consumer; + + /** + * Kafka Consumer Wrapper. + * BusTopicParam object contains the following parameters + * servers messaging bus hosts. + * topic topic + * + * @param busTopicParams - The parameters for the bus topic + * @throws GeneralSecurityException - Security exception + * @throws MalformedURLException - Malformed URL exception + */ + public KafkaConsumerWrapper(BusTopicParams busTopicParams) { + super(busTopicParams); + } + + @Override + public Iterable<String> fetch() throws IOException { + // TODO: Not implemented yet + return new ArrayList<>(); + } + + @Override + public void close() { + super.close(); + this.consumer.close(); + } + + @Override + public String toString() { + return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]"; + } + } + + /** * MR based consumer. */ public abstract class DmaapConsumerWrapper extends FetchingBusConsumer { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index 8bf805bf..e0df7095 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -5,6 +5,7 @@ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Copyright (C) 2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,8 +32,13 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.record.CompressionType; import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher; import org.onap.dmaap.mr.client.response.MRPublisherResponse; import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; @@ -144,6 +150,58 @@ public interface BusPublisher { } /** + * Kafka based library publisher. + */ + public static class KafkaPublisherWrapper implements BusPublisher { + + private static Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class); + + /** + * The actual Kafka publisher. + */ + private final KafkaProducer producer; + + /** + * Constructor. + * + * @param busTopicParams topic parameters + */ + public KafkaPublisherWrapper(BusTopicParams busTopicParams) { + // TODO Setting of topic parameters is not implemented yet. + //Setup Properties for Kafka Producer + Properties kafkaProps = new Properties(); + this.producer = new KafkaProducer(kafkaProps); + } + + @Override + public boolean send(String partitionId, String message) { + if (message == null) { + throw new IllegalArgumentException("No message provided"); + } + // TODO Sending messages is not implemented yet + return true; + } + + @Override + public void close() { + logger.info("{}: CLOSE", this); + + try (this.producer) { + this.producer.close(); + } catch (Exception e) { + logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e); + } + } + + + @Override + public String toString() { + return "KafkaPublisherWrapper []"; + } + + } + + /** * DmaapClient library wrapper. */ public abstract class DmaapPublisherWrapper implements BusPublisher { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java new file mode 100644 index 00000000..b564229b --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java @@ -0,0 +1,76 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +import org.onap.policy.common.endpoints.event.comm.Topic; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This implementation publishes events for the associated KAFKA topic, inline with the calling + * thread. + */ +public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTopicSink { + + /** + * Logger. + */ + private static Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class); + + /** + * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains below mentioned + * attributes. + * + * <p>servers list of KAFKA servers available for publishing + * topic the topic to publish to + * partitionId the partition key (optional, autogenerated if not provided) + * useHttps does connection use HTTPS? + * @param busTopicParams contains attributes needed + * @throws IllegalArgumentException if invalid arguments are detected + */ + public InlineKafkaTopicSink(BusTopicParams busTopicParams) { + super(busTopicParams); + } + + /** + * Instantiation of internal resources. + */ + @Override + public void init() { + + this.publisher = new BusPublisher.KafkaPublisherWrapper(BusTopicParams.builder() + .servers(this.servers) + .topic(this.effectiveTopic) + .useHttps(this.useHttps) + .build()); + logger.info("{}: KAFKA SINK created", this); + } + + @Override + public String toString() { + return "InlineKafkaTopicSink [getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()=" + + super.toString() + "]"; + } + + @Override + public CommInfrastructure getTopicCommInfrastructure() { + return Topic.CommInfrastructure.KAFKA; + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java new file mode 100644 index 00000000..b8362b83 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java @@ -0,0 +1,64 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +import org.onap.policy.common.endpoints.event.comm.Topic; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource; + +/** + * This topic source implementation specializes in reading messages over an Kafka Bus topic source and + * notifying its listeners. + */ +public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource implements KafkaTopicSource { + + /** + * Constructor. + * + * @param busTopicParams Parameters object containing all the required inputs + * @throws IllegalArgumentException An invalid parameter passed in + */ + public SingleThreadedKafkaTopicSource(BusTopicParams busTopicParams) { + super(busTopicParams); + this.init(); + } + + /** + * Initialize the Cambria client. + */ + @Override + public void init() { + this.consumer = new BusConsumer.KafkaConsumerWrapper(BusTopicParams.builder() + .servers(this.servers) + .topic(this.effectiveTopic) + .useHttps(this.useHttps) + .build()); + } + + @Override + public CommInfrastructure getTopicCommInfrastructure() { + return Topic.CommInfrastructure.KAFKA; + } + + @Override + public String toString() { + return "SingleThreadedKafkaTopicSource [getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + + ", toString()=" + super.toString() + "]"; + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java index 08ed2624..49dff287 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java @@ -3,6 +3,7 @@ * ONAP * ================================================================================ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -79,6 +80,11 @@ public final class PolicyEndPointProperties { public static final String PROPERTY_NOOP_SOURCE_TOPICS = "noop.source.topics"; public static final String PROPERTY_NOOP_SINK_TOPICS = "noop.sink.topics"; + /* KAFKA Properties */ + + public static final String PROPERTY_KAFKA_SOURCE_TOPICS = "kafka.source.topics"; + public static final String PROPERTY_KAFKA_SINK_TOPICS = "kafka.sink.topics"; + /* HTTP Server Properties */ public static final String PROPERTY_HTTP_SERVER_SERVICES = "http.server.services"; diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java new file mode 100644 index 00000000..3e62f98f --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java @@ -0,0 +1,58 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.utils; + +import com.google.re2j.Pattern; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class KafkaPropertyUtils { + private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*"); + + /** + * Makes a topic builder, configuring it with properties that are common to both + * sources and sinks. + * + * @param props properties to be used to configure the builder + * @param topic topic being configured + * @param servers target servers + * @return a topic builder + */ + public static TopicParamsBuilder makeBuilder(PropertyUtils props, String topic, String servers) { + + final List<String> serverList = new ArrayList<>(Arrays.asList(COMMA_SPACE_PAT.split(servers))); + //TODO More Kafka properties to be added + return BusTopicParams.builder() + .servers(serverList) + .topic(topic) + .effectiveTopic(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, + topic)) + .managed(props.getBoolean(PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, true)); + } +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactoryTestBase.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactoryTestBase.java new file mode 100644 index 00000000..3986549c --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactoryTestBase.java @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; + +import java.util.Collections; +import org.onap.policy.common.endpoints.event.comm.Topic; + +/** + * Base class for KafkaTopicXxxFactory tests. + * + * @param <T> type of topic managed by the factory + */ +public abstract class KafkaTopicFactoryTestBase<T extends Topic> extends BusTopicFactoryTestBase<T> { + + @Override + public void testBuildBusTopicParams_Ex() { + + super.testBuildBusTopicParams_Ex(); + + // null servers + assertThatIllegalArgumentException().as("null servers") + .isThrownBy(() -> buildTopic(makeBuilder().servers(null).build())); + + // empty servers + assertThatIllegalArgumentException().as("empty servers") + .isThrownBy(() -> buildTopic(makeBuilder().servers(Collections.emptyList()).build())); + } +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java new file mode 100644 index 00000000..1a815e1a --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java @@ -0,0 +1,75 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_EFFECTIVE_TOPIC; +import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_PARTITION; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX; + +import java.util.Arrays; +import lombok.Getter; +import org.onap.policy.common.endpoints.parameters.TopicParameters; + +public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder { + + public static final String SERVER = "my-server"; + public static final String TOPIC2 = "my-topic-2"; + + @Getter + private TopicParameters params = new TopicParameters(); + + /** + * Constructs the object. + * + * @param prefix the prefix for the properties to be built + */ + public KafkaTopicPropertyBuilder(String prefix) { + super(prefix); + } + + /** + * Adds a topic and configures it's properties with default values. + * + * @param topic the topic to be added + * @return this builder + */ + public KafkaTopicPropertyBuilder makeTopic(String topic) { + addTopic(topic); + + setTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, MY_EFFECTIVE_TOPIC); + setTopicProperty(PROPERTY_MANAGED_SUFFIX, "true"); + setTopicProperty(PROPERTY_HTTP_HTTPS_SUFFIX, "true"); + setTopicProperty(PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, MY_PARTITION); + setTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER); + + params.setTopicCommInfrastructure("kafka"); + params.setTopic(topic); + params.setEffectiveTopic(MY_EFFECTIVE_TOPIC); + params.setManaged(true); + params.setUseHttps(true); + params.setPartitionId(MY_PARTITION); + params.setServers(Arrays.asList(SERVER)); + + return this; + } +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkTest.java new file mode 100644 index 00000000..503e5131 --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkTest.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * ONAP Policy Engine - Common Modules + * ================================================================================ + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import static org.junit.Assert.assertNotNull; + +import org.junit.Test; + +public class KafkaTopicSinkTest { + + @Test + public void test() { + assertNotNull(KafkaTopicFactories.getSinkFactory()); + } + +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java new file mode 100644 index 00000000..6fa80a41 --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java @@ -0,0 +1,158 @@ +/* + * ============LICENSE_START======================================================= + * ONAP Policy Engine - Common Modules + * ================================================================================ + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS; + +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.Topic; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; + +public class KafkaTopicSourceFactoryTest extends KafkaTopicFactoryTestBase<KafkaTopicSource> { + + private SourceFactory factory; + + /** + * Creates the object to be tested. + */ + @Before + @Override + public void setUp() { + super.setUp(); + + factory = new SourceFactory(); + } + + @After + public void tearDown() { + factory.destroy(); + } + + @Test + @Override + public void testBuildBusTopicParams() { + super.testBuildBusTopicParams_Ex(); + } + + @Test + @Override + public void testBuildProperties() { + + initFactory(); + + List<KafkaTopicSource> topics = buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build()); + assertEquals(1, topics.size()); + assertEquals(MY_TOPIC, topics.get(0).getTopic()); + } + + @Test + @Override + public void testDestroyString_testGet_testInventory() { + super.testDestroyString_Ex(); + } + + @Test + public void testGet() { + super.testGet_Ex(); + } + + @Test + public void testToString() { + assertTrue(factory.toString().startsWith("IndexedKafkaTopicSourceFactory [")); + } + + @Override + protected void initFactory() { + if (factory != null) { + factory.destroy(); + } + + factory = new SourceFactory(); + } + + @Override + protected List<KafkaTopicSource> buildTopics(Properties properties) { + return factory.build(properties); + } + + @Override + protected KafkaTopicSource buildTopic(BusTopicParams params) { + return factory.build(params); + } + + @Override + protected KafkaTopicSource buildTopic(List<String> servers, String topic) { + return factory.build(servers, topic); + } + + @Override + protected void destroyFactory() { + factory.destroy(); + } + + @Override + protected void destroyTopic(String topic) { + factory.destroy(topic); + } + + @Override + protected List<KafkaTopicSource> getInventory() { + return factory.inventory(); + } + + @Override + protected KafkaTopicSource getTopic(String topic) { + return factory.get(topic); + } + + @Override + protected BusTopicParams getLastParams() { + return factory.params.getLast(); + } + + @Override + protected TopicPropertyBuilder makePropBuilder() { + return new KafkaTopicPropertyBuilder(PROPERTY_KAFKA_SOURCE_TOPICS); + } + + /** + * Factory that records the parameters of all of the sources it creates. + */ + private static class SourceFactory extends IndexedKafkaTopicSourceFactory { + private Deque<BusTopicParams> params = new LinkedList<>(); + + @Override + protected KafkaTopicSource makeSource(BusTopicParams busTopicParams) { + params.add(busTopicParams); + return super.makeSource(busTopicParams); + } + } +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceTest.java new file mode 100644 index 00000000..ee2d1d7b --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceTest.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * ONAP Policy Engine - Common Modules + * ================================================================================ + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import static org.junit.Assert.assertNotNull; + +import org.junit.Test; + +public class KafkaTopicSourceTest { + + @Test + public void verifyKafkaTopicFactoriesNotNull() { + assertNotNull(KafkaTopicFactories.getSourceFactory()); + } + +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java index 21050f97..da9f792b 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java @@ -46,6 +46,7 @@ import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.Dmaa import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.powermock.reflect.Whitebox; @@ -295,6 +296,17 @@ public class BusConsumerTest extends TopicTestBase { new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build()); } + @Test + public void testKafkaConsumerWrapper() throws Exception { + // verify that different wrappers can be built + assertThatCode(() -> new KafkaConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException(); + } + + @Test + public void testKafkaConsumerWrapperToString() throws Exception { + assertNotNull(new KafkaConsumerWrapper(makeBuilder().build()) {}.toString()); + } + private static class FetchingBusConsumerImpl extends FetchingBusConsumer { protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) { diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java new file mode 100644 index 00000000..cc096585 --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java @@ -0,0 +1,63 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase; +import org.onap.policy.common.utils.gson.GsonTestUtils; + +public class SingleThreadedKafkaTopicSourceTest extends TopicTestBase { + private SingleThreadedKafkaTopicSource source; + + /** + * Creates the object to be tested. + */ + @Before + @Override + public void setUp() { + super.setUp(); + + source = new SingleThreadedKafkaTopicSource(makeBuilder().build()); + } + + @After + public void tearDown() { + source.shutdown(); + } + + @Test + public void testToString() { + assertTrue(source.toString().startsWith("SingleThreadedKafkaTopicSource [")); + } + + @Test + public void testGetTopicCommInfrastructure() { + assertEquals(CommInfrastructure.KAFKA, source.getTopicCommInfrastructure()); + } + +} diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json new file mode 100644 index 00000000..7c512a87 --- /dev/null +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json @@ -0,0 +1,11 @@ +{ + "servers" : [ "svra", "svrb" ], + "topic" : "my-topic", + "effectiveTopic" : "my-effective-topic", + "recentEvents" : [ ], + "alive" : false, + "locked" : false, + "useHttps" : true, + "topicCommInfrastructure" : "KAFKA", + "partitionKey" : "my-partition" +} diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json new file mode 100644 index 00000000..626d87e8 --- /dev/null +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json @@ -0,0 +1,10 @@ +{ + "servers" : [ "svra", "svrb" ], + "topic" : "my-topic", + "effectiveTopic" : "my-effective-topic", + "recentEvents" : [ ], + "alive" : false, + "locked" : false, + "useHttps" : true, + "topicCommInfrastructure" : "KAFKA" +} |