diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus')
16 files changed, 16 insertions, 1553 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java index 5ca87732..4073f5a7 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java @@ -3,7 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2023 Nordix Foundation. + * Modifications Copyright (C) 2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,7 @@ package org.onap.policy.common.endpoints.event.comm.bus; import org.onap.policy.common.endpoints.event.comm.TopicSink; /** - * Topic Sink over Bus Infrastructure (DMAAP/UEB). + * Topic Sink over Bus Infrastructure (KAFKA/UEB). */ public interface BusTopicSink extends ApiKeyEnabled, TopicSink { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java index cd9bc015..f1af8a21 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +24,7 @@ package org.onap.policy.common.endpoints.event.comm.bus; import org.onap.policy.common.endpoints.event.comm.TopicSource; /** - * Generic Topic Source for UEB/DMAAP Communication Infrastructure. + * Generic Topic Source for UEB/KAFKA Communication Infrastructure. * */ public interface BusTopicSource extends ApiKeyEnabled, TopicSource { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactories.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactories.java deleted file mode 100644 index d5a46f8f..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactories.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import lombok.Getter; - -public class DmaapTopicFactories { - - /** - * Factory for instantiation and management of sinks. - */ - @Getter - private static final DmaapTopicSinkFactory sinkFactory = new IndexedDmaapTopicSinkFactory(); - - /** - * Factory for instantiation and management of sources. - */ - @Getter - private static final DmaapTopicSourceFactory sourceFactory = new IndexedDmaapTopicSourceFactory(); - - - private DmaapTopicFactories() { - // do nothing - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java deleted file mode 100644 index 805ed108..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java +++ /dev/null @@ -1,25 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -public interface DmaapTopicSink extends BusTopicSink { - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java deleted file mode 100644 index 4409e827..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import java.util.List; -import java.util.Properties; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; - -/** - * DMAAP Topic Sink Factory. - */ -public interface DmaapTopicSinkFactory { - - /** - * <pre> - * Instantiate a new DMAAP Topic Sink, with following params. - * servers list of servers - * topic topic name - * apiKey API Key - * apiSecret API Secret - * userName AAF user name - * password AAF password - * partitionKey Consumer Group - * environment DME2 environment - * aftEnvironment DME2 AFT environment - * partner DME2 Partner - * latitude DME2 latitude - * longitude DME2 longitude - * additionalProps additional properties to pass to DME2 - * managed is this sink endpoint managed? - * </pre> - * @param busTopicParams parameter object - * @return DmaapTopicSink object - * @throws IllegalArgumentException if invalid parameters are present - */ - DmaapTopicSink build(BusTopicParams busTopicParams); - - /** - * Creates an DMAAP Topic Sink based on properties files. - * - * @param properties Properties containing initialization values - * @return an DMAAP Topic Sink - * @throws IllegalArgumentException if invalid parameters are present - */ - List<DmaapTopicSink> build(Properties properties); - - /** - * Instantiates a new DMAAP Topic Sink. - * - * @param servers list of servers - * @param topic topic name - * @return an DMAAP Topic Sink - * @throws IllegalArgumentException if invalid parameters are present - */ - DmaapTopicSink build(List<String> servers, String topic); - - /** - * Destroys an DMAAP Topic Sink based on a topic. - * - * @param topic topic name - * @throws IllegalArgumentException if invalid parameters are present - */ - void destroy(String topic); - - /** - * Destroys all DMAAP Topic Sinks. - */ - void destroy(); - - /** - * Gets an DMAAP Topic Sink based on topic name. - * - * @param topic the topic name - * @return an DMAAP Topic Sink with topic name - * @throws IllegalArgumentException if an invalid topic is provided - * @throws IllegalStateException if the DMAAP Topic Reader is an incorrect state - */ - DmaapTopicSink get(String topic); - - /** - * Provides a snapshot of the DMAAP Topic Sinks. - * - * @return a list of the DMAAP Topic Sinks - */ - List<DmaapTopicSink> inventory(); -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java deleted file mode 100644 index 9893fa15..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java +++ /dev/null @@ -1,25 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -public interface DmaapTopicSource extends BusTopicSource { - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java deleted file mode 100644 index 7b1f185b..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import java.util.List; -import java.util.Properties; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; - -/** - * DMAAP Topic Source Factory. - */ -public interface DmaapTopicSourceFactory { - - /** - * Creates an DMAAP Topic Source based on properties files. - * - * @param properties Properties containing initialization values - * - * @return an DMAAP Topic Source - * @throws IllegalArgumentException if invalid parameters are present - */ - List<DmaapTopicSource> build(Properties properties); - - /** - * Instantiates a new DMAAP Topic Source. - * - * @param busTopicParams parameters object - * @return a DMAAP Topic Source - */ - DmaapTopicSource build(BusTopicParams busTopicParams); - - /** - * Instantiates a new DMAAP Topic Source. - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * - * @return an DMAAP Topic Source - * @throws IllegalArgumentException if invalid parameters are present - */ - DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret); - - /** - * Instantiates a new DMAAP Topic Source. - * - * @param servers list of servers - * @param topic topic name - * - * @return an DMAAP Topic Source - * @throws IllegalArgumentException if invalid parameters are present - */ - DmaapTopicSource build(List<String> servers, String topic); - - /** - * Destroys an DMAAP Topic Source based on a topic. - * - * @param topic topic name - * @throws IllegalArgumentException if invalid parameters are present - */ - void destroy(String topic); - - /** - * Destroys all DMAAP Topic Sources. - */ - void destroy(); - - /** - * Gets an DMAAP Topic Source based on topic name. - * - * @param topic the topic name - * @return an DMAAP Topic Source with topic name - * @throws IllegalArgumentException if an invalid topic is provided - * @throws IllegalStateException if the DMAAP Topic Source is an incorrect state - */ - DmaapTopicSource get(String topic); - - /** - * Provides a snapshot of the DMAAP Topic Sources. - * - * @return a list of the DMAAP Topic Sources - */ - List<DmaapTopicSource> inventory(); -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java deleted file mode 100644 index dfdadd1a..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import com.google.re2j.Pattern; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; -import org.apache.commons.lang3.StringUtils; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; -import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.onap.policy.common.endpoints.utils.DmaapPropertyUtils; -import org.onap.policy.common.endpoints.utils.PropertyUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Factory of DMAAP Reader Topics indexed by topic name. - */ -class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { - - private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*"); - private static final String MISSING_TOPIC = "A topic must be provided"; - - /** - * Logger. - */ - private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class); - - /** - * DMAAP Topic Name Index. - */ - protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>(); - - @Override - public DmaapTopicSink build(BusTopicParams busTopicParams) { - - if (StringUtils.isBlank(busTopicParams.getTopic())) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (dmaapTopicWriters.containsKey(busTopicParams.getTopic())) { - return dmaapTopicWriters.get(busTopicParams.getTopic()); - } - - var dmaapTopicSink = makeSink(busTopicParams); - - if (busTopicParams.isManaged()) { - dmaapTopicWriters.put(busTopicParams.getTopic(), dmaapTopicSink); - } - return dmaapTopicSink; - } - } - - @Override - public DmaapTopicSink build(List<String> servers, String topic) { - return this.build(BusTopicParams.builder() - .servers(servers) - .topic(topic) - .managed(true) - .useHttps(false) - .allowSelfSignedCerts(false) - .build()); - } - - @Override - public List<DmaapTopicSink> build(Properties properties) { - - String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS); - if (StringUtils.isBlank(writeTopics)) { - logger.info("{}: no topic for DMaaP Sink", this); - return new ArrayList<>(); - } - - List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>(); - synchronized (this) { - for (String topic : COMMA_SPACE_PAT.split(writeTopics)) { - addTopic(newDmaapTopicSinks, properties, topic); - } - return newDmaapTopicSinks; - } - } - - private void addTopic(List<DmaapTopicSink> newDmaapTopicSinks, Properties properties, String topic) { - if (this.dmaapTopicWriters.containsKey(topic)) { - newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic)); - return; - } - - String topicPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic; - - var props = new PropertyUtils(properties, topicPrefix, - (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); - - String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - if (StringUtils.isBlank(servers)) { - logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); - return; - } - - var dmaapTopicSink = this.build(DmaapPropertyUtils.makeBuilder(props, topic, servers) - .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null)) - .build()); - - newDmaapTopicSinks.add(dmaapTopicSink); - } - - /** - * Makes a new sink. - * - * @param busTopicParams parameters to use to configure the sink - * @return a new sink - */ - protected DmaapTopicSink makeSink(BusTopicParams busTopicParams) { - return new InlineDmaapTopicSink(busTopicParams); - } - - @Override - public void destroy(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - DmaapTopicSink dmaapTopicWriter; - synchronized (this) { - if (!dmaapTopicWriters.containsKey(topic)) { - return; - } - - dmaapTopicWriter = dmaapTopicWriters.remove(topic); - } - - dmaapTopicWriter.shutdown(); - } - - @Override - public void destroy() { - List<DmaapTopicSink> writers = this.inventory(); - for (DmaapTopicSink writer : writers) { - writer.shutdown(); - } - - synchronized (this) { - this.dmaapTopicWriters.clear(); - } - } - - @Override - public DmaapTopicSink get(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (dmaapTopicWriters.containsKey(topic)) { - return dmaapTopicWriters.get(topic); - } else { - throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); - } - } - } - - @Override - public synchronized List<DmaapTopicSink> inventory() { - return new ArrayList<>(this.dmaapTopicWriters.values()); - } - - @Override - public String toString() { - return "IndexedDmaapTopicSinkFactory " + dmaapTopicWriters.keySet(); - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java deleted file mode 100644 index 66960b15..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import com.google.re2j.Pattern; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; -import org.apache.commons.lang3.StringUtils; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; -import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.onap.policy.common.endpoints.utils.DmaapPropertyUtils; -import org.onap.policy.common.endpoints.utils.PropertyUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Factory of DMAAP Source Topics indexed by topic name. - */ - -class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { - private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*"); - private static final String MISSING_TOPIC = "A topic must be provided"; - - /** - * Logger. - */ - private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class); - - /** - * DMaaP Topic Name Index. - */ - protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>(); - - @Override - public DmaapTopicSource build(BusTopicParams busTopicParams) { - - if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (dmaapTopicSources.containsKey(busTopicParams.getTopic())) { - return dmaapTopicSources.get(busTopicParams.getTopic()); - } - - var dmaapTopicSource = makeSource(busTopicParams); - - if (busTopicParams.isManaged()) { - dmaapTopicSources.put(busTopicParams.getTopic(), dmaapTopicSource); - } - return dmaapTopicSource; - } - } - - @Override - public List<DmaapTopicSource> build(Properties properties) { - - String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS); - if (StringUtils.isBlank(readTopics)) { - logger.info("{}: no topic for DMaaP Source", this); - return new ArrayList<>(); - } - - List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>(); - synchronized (this) { - for (String topic : COMMA_SPACE_PAT.split(readTopics)) { - addTopic(dmaapTopicSourceLst, properties, topic); - } - } - return dmaapTopicSourceLst; - } - - @Override - public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) { - return this.build(BusTopicParams.builder() - .servers(servers) - .topic(topic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH) - .fetchLimit(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH) - .managed(true) - .useHttps(false) - .allowSelfSignedCerts(false) - .build()); - } - - @Override - public DmaapTopicSource build(List<String> servers, String topic) { - return this.build(servers, topic, null, null); - } - - private void addTopic(List<DmaapTopicSource> dmaapTopicSourceLst, Properties properties, String topic) { - if (this.dmaapTopicSources.containsKey(topic)) { - dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic)); - return; - } - - String topicPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic; - - var props = new PropertyUtils(properties, topicPrefix, - (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); - - String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - if (StringUtils.isBlank(servers)) { - logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); - return; - } - - DmaapTopicSource uebTopicSource = this.build(DmaapPropertyUtils.makeBuilder(props, topic, servers) - .consumerGroup(props.getString( - PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null)) - .consumerInstance(props.getString( - PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null)) - .fetchTimeout(props.getInteger( - PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX, - PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH)) - .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, - PolicyEndPointProperties.DEFAULT_LIMIT_FETCH)) - .build()); - - dmaapTopicSourceLst.add(uebTopicSource); - } - - /** - * Makes a new source. - * - * @param busTopicParams parameters to use to configure the source - * @return a new source - */ - protected DmaapTopicSource makeSource(BusTopicParams busTopicParams) { - return new SingleThreadedDmaapTopicSource(busTopicParams); - } - - @Override - public void destroy(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - DmaapTopicSource uebTopicSource; - - synchronized (this) { - if (!dmaapTopicSources.containsKey(topic)) { - return; - } - - uebTopicSource = dmaapTopicSources.remove(topic); - } - - uebTopicSource.shutdown(); - } - - @Override - public void destroy() { - List<DmaapTopicSource> readers = this.inventory(); - for (DmaapTopicSource reader : readers) { - reader.shutdown(); - } - - synchronized (this) { - this.dmaapTopicSources.clear(); - } - } - - @Override - public DmaapTopicSource get(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (dmaapTopicSources.containsKey(topic)) { - return dmaapTopicSources.get(topic); - } else { - throw new IllegalStateException("DmaapTopiceSource for " + topic + " not found"); - } - } - } - - @Override - public synchronized List<DmaapTopicSource> inventory() { - return new ArrayList<>(this.dmaapTopicSources.values()); - } - - @Override - public String toString() { - return "IndexedDmaapTopicSourceFactory " + dmaapTopicSources.keySet(); - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index d6fa0645..539a78c2 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -55,11 +55,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Headers; import org.jetbrains.annotations.NotNull; -import org.onap.dmaap.mr.client.MRClientFactory; -import org.onap.dmaap.mr.client.impl.MRConsumerImpl; -import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder; -import org.onap.dmaap.mr.client.response.MRConsumerResponse; -import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -377,254 +372,6 @@ public interface BusConsumer { return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]"; } } - - /** - * MR based consumer. - */ - public abstract class DmaapConsumerWrapper extends FetchingBusConsumer { - - /** - * logger. - */ - private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class); - - /** - * Name of the "protocol" property. - */ - protected static final String PROTOCOL_PROP = "Protocol"; - - /** - * MR Consumer. - */ - protected MRConsumerImpl consumer; - - /** - * MR Consumer Wrapper. - * - * <p>servers - messaging bus hosts - * topic - topic - * apiKey - API Key - * apiSecret - API Secret - * username - AAF Login - * password - AAF Password - * consumerGroup - Consumer Group - * consumerInstance - Consumer Instance - * fetchTimeout - Fetch Timeout - * fetchLimit - Fetch Limit - * - * @param busTopicParams contains above listed attributes - * @throws MalformedURLException URL should be valid - */ - protected DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { - super(busTopicParams); - - if (busTopicParams.isTopicInvalid()) { - throw new IllegalArgumentException("No topic for DMaaP"); - } - - this.consumer = new MRConsumerImplBuilder() - .setHostPart(busTopicParams.getServers()) - .setTopic(busTopicParams.getTopic()) - .setConsumerGroup(busTopicParams.getConsumerGroup()) - .setConsumerId(busTopicParams.getConsumerInstance()) - .setTimeoutMs(busTopicParams.getFetchTimeout()) - .setLimit(busTopicParams.getFetchLimit()) - .setApiKey(busTopicParams.getApiKey()) - .setApiSecret(busTopicParams.getApiSecret()) - .createMRConsumerImpl(); - - this.consumer.setUsername(busTopicParams.getUserName()); - this.consumer.setPassword(busTopicParams.getPassword()); - } - - @Override - public Iterable<String> fetch() { - final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse(); - if (response == null) { - logger.warn("{}: DMaaP NULL response received", this); - - sleepAfterFetchFailure(); - return new ArrayList<>(); - } else { - logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), - response.getResponseMessage()); - - if (!"200".equals(response.getResponseCode())) { - - logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(), - response.getResponseMessage()); - - sleepAfterFetchFailure(); - - /* fall through */ - } - } - - if (response.getActualMessages() == null) { - return new ArrayList<>(); - } else { - return response.getActualMessages(); - } - } - - @Override - public void close() { - super.close(); - this.consumer.close(); - } - - @Override - public String toString() { - return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate() - + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost() - + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()=" - + consumer.getUsername() + "]"; - } - } - - /** - * MR based consumer. - */ - class DmaapAafConsumerWrapper extends DmaapConsumerWrapper { - - private static final Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class); - - /** - * BusTopicParams contain the following parameters. - * MR Consumer Wrapper. - * - * <p>servers messaging bus hosts - * topic - topic - * apiKey - API Key - * apiSecret - API Secret - * aafLogin - AAF Login - * aafPassword - AAF Password - * consumerGroup - Consumer Group - * consumerInstance - Consumer Instance - * fetchTimeout - Fetch Timeout - * fetchLimit - Fetch Limit - * - * @param busTopicParams contains above listed params - * @throws MalformedURLException URL should be valid - */ - public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { - - super(busTopicParams); - - // super constructor sets servers = {""} if empty to avoid errors when using DME2 - if (busTopicParams.isServersInvalid()) { - throw new IllegalArgumentException("Must provide at least one host for HTTP AAF"); - } - - this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - - Properties props = new Properties(); - - if (busTopicParams.isUseHttps()) { - props.setProperty(PROTOCOL_PROP, "https"); - this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905"); - - } else { - props.setProperty(PROTOCOL_PROP, "http"); - this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904"); - } - - this.consumer.setProps(props); - logger.info("{}: CREATION", this); - } - - @Override - public String toString() { - final MRConsumerImpl consumer = this.consumer; - - return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate() - + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost() - + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()=" - + consumer.getUsername() + "]"; - } - } - - class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper { - - private static final Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class); - - /** - * Constructor. - * - * @param busTopicParams topic parameters - * @throws MalformedURLException must provide a valid URL - */ - public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { - - - super(busTopicParams); - - - final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid() - ? busTopicParams.getAdditionalProps().get( - PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY) - : null); - - BusHelper.validateBusTopicParams(busTopicParams, PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS); - - if ((busTopicParams.isPartnerInvalid()) - && StringUtils.isBlank(dme2RouteOffer)) { - throw new IllegalArgumentException( - "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " - + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); - } - - final String serviceName = busTopicParams.getServers().get(0); - - this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); - - this.consumer.setUsername(busTopicParams.getUserName()); - this.consumer.setPassword(busTopicParams.getPassword()); - - Properties props = getProperties(busTopicParams, serviceName, dme2RouteOffer); - - MRClientFactory.prop = props; - this.consumer.setProps(props); - - logger.info("{}: CREATION", this); - } - - @NotNull - private static Properties getProperties(BusTopicParams busTopicParams, String serviceName, - String dme2RouteOffer) { - Properties props = new Properties(); - - props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName); - - props.setProperty("username", busTopicParams.getUserName()); - props.setProperty("password", busTopicParams.getPassword()); - - /* These are required, no defaults */ - props.setProperty("topic", busTopicParams.getTopic()); - - BusHelper.setCommonProperties(busTopicParams, dme2RouteOffer, props); - - props.setProperty("MethodType", "GET"); - - if (busTopicParams.isUseHttps()) { - props.setProperty(PROTOCOL_PROP, "https"); - - } else { - props.setProperty(PROTOCOL_PROP, "http"); - } - - props.setProperty("contenttype", "application/json"); - - if (busTopicParams.isAdditionalPropsValid()) { - props.putAll(busTopicParams.getAdditionalProps()); - } - return props; - } - } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusHelper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusHelper.java deleted file mode 100644 index 298607b5..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusHelper.java +++ /dev/null @@ -1,95 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP POLICY - * ================================================================================ - * Copyright (C) 2023 Nordix Foundation. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END============================================ - * =================================================================== - * - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal; - -import java.util.Properties; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; - -public class BusHelper { - - private BusHelper() { - /* no constructor */ - } - - /** - * Complete the properties param with common fields for both BusConsumer and BusPublisher. - * @param busTopicParams topics - * @param dme2RouteOffer route - * @param props properties - */ - public static void setCommonProperties(BusTopicParams busTopicParams, String dme2RouteOffer, Properties props) { - props.setProperty("Environment", busTopicParams.getEnvironment()); - props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment()); - - if (busTopicParams.getPartner() != null) { - props.setProperty("Partner", busTopicParams.getPartner()); - } - if (dme2RouteOffer != null) { - props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - } - - props.setProperty("Latitude", busTopicParams.getLatitude()); - props.setProperty("Longitude", busTopicParams.getLongitude()); - - /* These are optional, will default to these values if not set in additionalProps */ - props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); - props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); - props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); - props.setProperty("Version", "1.0"); - props.setProperty("SubContextPath", "/"); - props.setProperty("sessionstickinessrequired", "no"); - - /* These should not change */ - props.setProperty("TransportType", "DME2"); - } - - /** - * Throws exception when any of the checks are invalid. - * @param busTopicParams topics - * @param topicType topic type (sink or source) - */ - public static void validateBusTopicParams(BusTopicParams busTopicParams, String topicType) { - if (busTopicParams.isEnvironmentInvalid()) { - throw paramException(busTopicParams.getTopic(), topicType, - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isAftEnvironmentInvalid()) { - throw paramException(busTopicParams.getTopic(), topicType, - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isLatitudeInvalid()) { - throw paramException(busTopicParams.getTopic(), topicType, - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - } - if (busTopicParams.isLongitudeInvalid()) { - throw paramException(busTopicParams.getTopic(), topicType, - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - } - } - - private static IllegalArgumentException paramException(String topic, String topicType, String propertyName) { - return new IllegalArgumentException("Missing " + topicType + "." - + topic + propertyName + " property for DME2 in DMaaP"); - - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index def8f841..e2adde0d 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -5,7 +5,7 @@ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved. - * Modifications Copyright (C) 2022-2023 Nordix Foundation. + * Modifications Copyright (C) 2022-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,21 +29,12 @@ import com.att.nsa.cambria.client.CambriaClientBuilders; import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor; import java.net.MalformedURLException; import java.security.GeneralSecurityException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.UUID; -import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher; -import org.onap.dmaap.mr.client.response.MRPublisherResponse; -import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.onap.policy.common.gson.annotation.GsonJsonIgnore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -239,202 +230,4 @@ public interface BusPublisher { } } - - /** - * DmaapClient library wrapper. - */ - abstract class DmaapPublisherWrapper implements BusPublisher { - - private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class); - - /** - * MR based Publisher. - */ - protected MRSimplerBatchPublisher publisher; - protected Properties props; - - /** - * MR Publisher Wrapper. - * - * @param servers messaging bus hosts - * @param topic topic - * @param username AAF or DME2 Login - * @param password AAF or DME2 Password - */ - protected DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic, - String username, String password, boolean useHttps) { - - if (StringUtils.isBlank(topic)) { - throw new IllegalArgumentException("No topic for DMaaP"); - } - - configureProtocol(topic, protocol, servers, useHttps); - - this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName())); - - this.publisher.setUsername(username); - this.publisher.setPassword(password); - - props = new Properties(); - - props.setProperty("Protocol", (useHttps ? "https" : "http")); - props.setProperty("contenttype", "application/json"); - props.setProperty("username", username); - props.setProperty("password", password); - - props.setProperty("topic", topic); - - this.publisher.setProps(props); - - if (protocol == ProtocolTypeConstants.AAF_AUTH) { - this.publisher.setHost(servers.get(0)); - } - - logger.info("{}: CREATION: using protocol {}", this, protocol.getValue()); - } - - private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers, - boolean useHttps) { - - if (protocol == ProtocolTypeConstants.AAF_AUTH) { - if (servers == null || servers.isEmpty()) { - throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided"); - } - - ArrayList<String> dmaapServers = new ArrayList<>(); - String port = useHttps ? ":3905" : ":3904"; - for (String server : servers) { - dmaapServers.add(server + port); - } - - this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build(); - - this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - - } else if (protocol == ProtocolTypeConstants.DME2) { - ArrayList<String> dmaapServers = new ArrayList<>(); - dmaapServers.add("0.0.0.0:3904"); - - this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build(); - - this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); - - } else { - throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol); - } - } - - @Override - public void close() { - logger.info(LOG_CLOSE, this); - - try { - this.publisher.close(1, TimeUnit.SECONDS); - - } catch (InterruptedException e) { - logger.warn(LOG_CLOSE_FAILED, this, e); - Thread.currentThread().interrupt(); - - } catch (Exception e) { - logger.warn(LOG_CLOSE_FAILED, this, e); - } - } - - @Override - public boolean send(String partitionId, String message) { - if (message == null) { - throw new IllegalArgumentException(NO_MESSAGE_PROVIDED); - } - - this.publisher.setPubResponse(new MRPublisherResponse()); - this.publisher.send(partitionId, message); - MRPublisherResponse response = this.publisher.sendBatchWithResponse(); - if (response != null) { - logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(), - response.getResponseMessage()); - } - - return true; - } - - @Override - public String toString() { - return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate() - + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()=" - + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag() - + ", publisher.getUsername()=" + publisher.getUsername() + "]"; - } - } - - /** - * DmaapClient library wrapper. - */ - class DmaapAafPublisherWrapper extends DmaapPublisherWrapper { - /** - * MR based Publisher. - */ - public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword, - boolean useHttps) { - - super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps); - } - } - - class DmaapDmePublisherWrapper extends DmaapPublisherWrapper { - - /** - * Constructor. - * - * @param busTopicParams topic parameters - */ - public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) { - - super(ProtocolTypeConstants.DME2, busTopicParams.getServers(), busTopicParams.getTopic(), - busTopicParams.getUserName(), busTopicParams.getPassword(), busTopicParams.isUseHttps()); - - String dme2RouteOffer = busTopicParams.isAdditionalPropsValid() - ? busTopicParams.getAdditionalProps().get(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY) - : null; - - validateParams(busTopicParams, dme2RouteOffer); - - String serviceName = busTopicParams.getServers().get(0); - - /* These are required, no defaults */ - props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName); - - BusHelper.setCommonProperties(busTopicParams, dme2RouteOffer, props); - - props.setProperty("MethodType", "POST"); - - if (busTopicParams.isAdditionalPropsValid()) { - addAdditionalProps(busTopicParams); - } - - this.publisher.setProps(props); - } - - private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) { - BusHelper.validateBusTopicParams(busTopicParams, PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS); - - if ((busTopicParams.isPartnerInvalid()) && StringUtils.isBlank(dme2RouteOffer)) { - throw new IllegalArgumentException("Must provide at least " - + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " - + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); - } - } - - private void addAdditionalProps(BusTopicParams busTopicParams) { - for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - - if (value != null) { - props.setProperty(key, value); - } - } - } - } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java index 7cc8f8b6..53a6ab66 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java @@ -4,7 +4,7 @@ * ================================================================================ * Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved. * Modifications Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2019, 2023 Nordix Foundation. + * Modifications Copyright (C) 2019, 2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,14 +33,14 @@ import org.apache.commons.lang3.StringUtils; /** * Member variables of this Params class are as follows. * - * <p>servers DMaaP servers - * topic DMaaP Topic to be monitored - * apiKey DMaaP API Key (optional) - * apiSecret DMaaP API Secret (optional) - * consumerGroup DMaaP Reader Consumer Group - * consumerInstance DMaaP Reader Instance - * fetchTimeout DMaaP fetch timeout - * fetchLimit DMaaP fetch limit + * <p>servers Kafka servers + * topic Kafka Topic to be monitored + * apiKey Kafka API Key (optional) + * apiSecret Kafka API Secret (optional) + * consumerGroup kafka Reader Consumer Group + * consumerInstance Kafka Reader Instance + * fetchTimeout kafka fetch timeout + * fetchLimit Kafka fetch limit * environment DME2 Environment * aftEnvironment DME2 AFT Environment * partner DME2 Partner diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java index 7c740abf..dfdc7b3c 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java @@ -5,7 +5,7 @@ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd. * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. - * Modifications Copyright (C) 2023 Nordix Foundation. + * Modifications Copyright (C) 2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; /** * Transport Agnostic Bus Topic Sink to carry out the core functionality to interact with a sink - * regardless if it is UEB or DMaaP. + * regardless if it is UEB or Kafka. * */ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java deleted file mode 100644 index 771efb33..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java +++ /dev/null @@ -1,132 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. - * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal; - -import java.util.Map; -import org.onap.policy.common.endpoints.event.comm.Topic; -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This implementation publishes events for the associated DMAAP topic, inline with the calling - * thread. - */ -public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTopicSink { - - protected static Logger logger = LoggerFactory.getLogger(InlineDmaapTopicSink.class); - - protected final String userName; - protected final String password; - - protected String environment = null; - protected String aftEnvironment = null; - protected String partner = null; - protected String latitude = null; - protected String longitude = null; - - protected Map<String, String> additionalProps = null; - - /** - * BusTopicParams contains the below mentioned attributes. - * servers DMaaP servers - * topic DMaaP Topic to be monitored - * apiKey DMaaP API Key (optional) - * apiSecret DMaaP API Secret (optional) - * environment DME2 Environment - * aftEnvironment DME2 AFT Environment - * partner DME2 Partner - * latitude DME2 Latitude - * longitude DME2 Longitude - * additionalProps Additional properties to pass to DME2 - * useHttps does connection use HTTPS? - * allowTracing is tracing allowed? - * allowSelfSignedCerts are self-signed certificates allow - * @param busTopicParams Contains the above mentioned parameters - * @throws IllegalArgumentException An invalid parameter passed in - */ - public InlineDmaapTopicSink(BusTopicParams busTopicParams) { - - super(busTopicParams); - - this.userName = busTopicParams.getUserName(); - this.password = busTopicParams.getPassword(); - - this.environment = busTopicParams.getEnvironment(); - this.aftEnvironment = busTopicParams.getAftEnvironment(); - this.partner = busTopicParams.getPartner(); - - this.latitude = busTopicParams.getLatitude(); - this.longitude = busTopicParams.getLongitude(); - - this.additionalProps = busTopicParams.getAdditionalProps(); - } - - - @Override - public void init() { - if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) { - this.publisher = new BusPublisher.CambriaPublisherWrapper(BusTopicParams.builder() - .servers(this.servers) - .topic(this.effectiveTopic) - .apiKey(this.apiKey) - .apiSecret(this.apiSecret) - .userName(this.userName) - .password(this.password) - .useHttps(this.useHttps) - .allowTracing(this.allowTracing) - .allowSelfSignedCerts(this.allowSelfSignedCerts) - .build()); - } else { - this.publisher = new BusPublisher.DmaapDmePublisherWrapper(BusTopicParams.builder() - .servers(this.servers) - .topic(this.effectiveTopic) - .userName(this.userName) - .password(this.password) - .environment(this.environment) - .aftEnvironment(this.aftEnvironment) - .partner(this.partner) - .latitude(this.latitude) - .longitude(this.longitude) - .additionalProps(this.additionalProps) - .useHttps(this.useHttps) - .allowTracing(this.allowTracing) - .build()); - } - - logger.info("{}: DMAAP SINK created", this); - } - - @Override - public CommInfrastructure getTopicCommInfrastructure() { - return Topic.CommInfrastructure.DMAAP; - } - - - @Override - public String toString() { - return "InlineDmaapTopicSink [userName=" + userName + ", password=" + password - + ", getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()=" + super.toString() - + "]"; - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java deleted file mode 100644 index 26960379..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java +++ /dev/null @@ -1,139 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal; - -import java.net.MalformedURLException; -import java.util.Map; -import org.onap.policy.common.endpoints.event.comm.Topic; -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This topic reader implementation specializes in reading messages over DMAAP topic and notifying - * its listeners. - */ -public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource implements DmaapTopicSource, Runnable { - - private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class); - - - protected final String userName; - protected final String password; - - protected String environment = null; - protected String aftEnvironment = null; - protected String partner = null; - protected String latitude = null; - protected String longitude = null; - - protected Map<String, String> additionalProps = null; - - - /** - * Constructor. - * - * @param busTopicParams Parameters object containing all the required inputs - * - * @throws IllegalArgumentException An invalid parameter passed in - */ - public SingleThreadedDmaapTopicSource(BusTopicParams busTopicParams) { - - super(busTopicParams); - - this.userName = busTopicParams.getUserName(); - this.password = busTopicParams.getPassword(); - - this.environment = busTopicParams.getEnvironment(); - this.aftEnvironment = busTopicParams.getAftEnvironment(); - this.partner = busTopicParams.getPartner(); - - this.latitude = busTopicParams.getLatitude(); - this.longitude = busTopicParams.getLongitude(); - - this.additionalProps = busTopicParams.getAdditionalProps(); - try { - this.init(); - } catch (Exception e) { - throw new IllegalArgumentException("ERROR during init in dmaap-source: cannot create topic " + topic, e); - } - } - - - /** - * Initialize the Cambria or MR Client. - */ - @Override - public void init() throws MalformedURLException { - BusTopicParams.TopicParamsBuilder builder = BusTopicParams.builder() - .servers(this.servers) - .topic(this.effectiveTopic) - .apiKey(this.apiKey) - .apiSecret(this.apiSecret) - .consumerGroup(this.consumerGroup) - .consumerInstance(this.consumerInstance) - .fetchTimeout(this.fetchTimeout) - .fetchLimit(this.fetchLimit) - .useHttps(this.useHttps) - .allowTracing(this.allowTracing); - - if (anyNullOrEmpty(this.userName, this.password)) { - this.consumer = new BusConsumer.CambriaConsumerWrapper(builder - .allowSelfSignedCerts(this.allowSelfSignedCerts) - .build()); - } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) { - this.consumer = new BusConsumer.CambriaConsumerWrapper(builder - .userName(this.userName) - .password(this.password) - .allowSelfSignedCerts(this.allowSelfSignedCerts) - .build()); - } else { - this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(builder - .userName(this.userName) - .password(this.password) - .environment(this.environment) - .aftEnvironment(this.aftEnvironment) - .partner(this.partner) - .latitude(this.latitude) - .longitude(this.longitude) - .additionalProps(this.additionalProps) - .build()); - } - - logger.info("{}: INITTED", this); - } - - @Override - public CommInfrastructure getTopicCommInfrastructure() { - return Topic.CommInfrastructure.DMAAP; - } - - @Override - public String toString() { - return "SingleThreadedDmaapTopicSource [userName=" + userName - + ", password=" + (password == null || password.isEmpty() ? "-" : password.length()) - + ", getTopicCommInfrastructure()=" + getTopicCommInfrastructure() - + ", toString()=" + super.toString() + "]"; - } - - -} |