aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal
diff options
context:
space:
mode:
authorrameshiyer27 <ramesh.murugan.iyer@est.tech>2024-01-25 14:25:37 +0000
committerrameshiyer27 <ramesh.murugan.iyer@est.tech>2024-02-07 14:59:50 +0000
commitbeeb5e3846c8457987711491bcb58a6567870591 (patch)
tree227cdd91b00f3889a1109d457f968472cb7ee596 /policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal
parent0241c0aa14447c99fccecc61e91b35051d6743be (diff)
Remove Dmaap references from policy-common
- updated dependencies for jakarta.* compabilities - other dependency updates for security fixes Issue-ID: POLICY-4881 Change-Id: I979d944fcd21279f618d1bcbfe12e914ba30077f Signed-off-by: rameshiyer27 <ramesh.murugan.iyer@est.tech>
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java253
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusHelper.java95
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java209
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java18
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java4
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java132
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java139
7 files changed, 12 insertions, 838 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
index d6fa0645..539a78c2 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
@@ -55,11 +55,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.jetbrains.annotations.NotNull;
-import org.onap.dmaap.mr.client.MRClientFactory;
-import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
-import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
-import org.onap.dmaap.mr.client.response.MRConsumerResponse;
-import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -377,254 +372,6 @@ public interface BusConsumer {
return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
}
}
-
- /**
- * MR based consumer.
- */
- public abstract class DmaapConsumerWrapper extends FetchingBusConsumer {
-
- /**
- * logger.
- */
- private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
-
- /**
- * Name of the "protocol" property.
- */
- protected static final String PROTOCOL_PROP = "Protocol";
-
- /**
- * MR Consumer.
- */
- protected MRConsumerImpl consumer;
-
- /**
- * MR Consumer Wrapper.
- *
- * <p>servers - messaging bus hosts
- * topic - topic
- * apiKey - API Key
- * apiSecret - API Secret
- * username - AAF Login
- * password - AAF Password
- * consumerGroup - Consumer Group
- * consumerInstance - Consumer Instance
- * fetchTimeout - Fetch Timeout
- * fetchLimit - Fetch Limit
- *
- * @param busTopicParams contains above listed attributes
- * @throws MalformedURLException URL should be valid
- */
- protected DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
- super(busTopicParams);
-
- if (busTopicParams.isTopicInvalid()) {
- throw new IllegalArgumentException("No topic for DMaaP");
- }
-
- this.consumer = new MRConsumerImplBuilder()
- .setHostPart(busTopicParams.getServers())
- .setTopic(busTopicParams.getTopic())
- .setConsumerGroup(busTopicParams.getConsumerGroup())
- .setConsumerId(busTopicParams.getConsumerInstance())
- .setTimeoutMs(busTopicParams.getFetchTimeout())
- .setLimit(busTopicParams.getFetchLimit())
- .setApiKey(busTopicParams.getApiKey())
- .setApiSecret(busTopicParams.getApiSecret())
- .createMRConsumerImpl();
-
- this.consumer.setUsername(busTopicParams.getUserName());
- this.consumer.setPassword(busTopicParams.getPassword());
- }
-
- @Override
- public Iterable<String> fetch() {
- final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
- if (response == null) {
- logger.warn("{}: DMaaP NULL response received", this);
-
- sleepAfterFetchFailure();
- return new ArrayList<>();
- } else {
- logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
- response.getResponseMessage());
-
- if (!"200".equals(response.getResponseCode())) {
-
- logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
- response.getResponseMessage());
-
- sleepAfterFetchFailure();
-
- /* fall through */
- }
- }
-
- if (response.getActualMessages() == null) {
- return new ArrayList<>();
- } else {
- return response.getActualMessages();
- }
- }
-
- @Override
- public void close() {
- super.close();
- this.consumer.close();
- }
-
- @Override
- public String toString() {
- return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
- + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
- + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
- + consumer.getUsername() + "]";
- }
- }
-
- /**
- * MR based consumer.
- */
- class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
-
- private static final Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
-
- /**
- * BusTopicParams contain the following parameters.
- * MR Consumer Wrapper.
- *
- * <p>servers messaging bus hosts
- * topic - topic
- * apiKey - API Key
- * apiSecret - API Secret
- * aafLogin - AAF Login
- * aafPassword - AAF Password
- * consumerGroup - Consumer Group
- * consumerInstance - Consumer Instance
- * fetchTimeout - Fetch Timeout
- * fetchLimit - Fetch Limit
- *
- * @param busTopicParams contains above listed params
- * @throws MalformedURLException URL should be valid
- */
- public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
-
- super(busTopicParams);
-
- // super constructor sets servers = {""} if empty to avoid errors when using DME2
- if (busTopicParams.isServersInvalid()) {
- throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
- }
-
- this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
-
- Properties props = new Properties();
-
- if (busTopicParams.isUseHttps()) {
- props.setProperty(PROTOCOL_PROP, "https");
- this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
-
- } else {
- props.setProperty(PROTOCOL_PROP, "http");
- this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
- }
-
- this.consumer.setProps(props);
- logger.info("{}: CREATION", this);
- }
-
- @Override
- public String toString() {
- final MRConsumerImpl consumer = this.consumer;
-
- return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
- + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
- + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
- + consumer.getUsername() + "]";
- }
- }
-
- class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
-
- private static final Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
-
- /**
- * Constructor.
- *
- * @param busTopicParams topic parameters
- * @throws MalformedURLException must provide a valid URL
- */
- public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
-
-
- super(busTopicParams);
-
-
- final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
- ? busTopicParams.getAdditionalProps().get(
- PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
- : null);
-
- BusHelper.validateBusTopicParams(busTopicParams, PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
-
- if ((busTopicParams.isPartnerInvalid())
- && StringUtils.isBlank(dme2RouteOffer)) {
- throw new IllegalArgumentException(
- "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
- + "." + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
- + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
- }
-
- final String serviceName = busTopicParams.getServers().get(0);
-
- this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
-
- this.consumer.setUsername(busTopicParams.getUserName());
- this.consumer.setPassword(busTopicParams.getPassword());
-
- Properties props = getProperties(busTopicParams, serviceName, dme2RouteOffer);
-
- MRClientFactory.prop = props;
- this.consumer.setProps(props);
-
- logger.info("{}: CREATION", this);
- }
-
- @NotNull
- private static Properties getProperties(BusTopicParams busTopicParams, String serviceName,
- String dme2RouteOffer) {
- Properties props = new Properties();
-
- props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
-
- props.setProperty("username", busTopicParams.getUserName());
- props.setProperty("password", busTopicParams.getPassword());
-
- /* These are required, no defaults */
- props.setProperty("topic", busTopicParams.getTopic());
-
- BusHelper.setCommonProperties(busTopicParams, dme2RouteOffer, props);
-
- props.setProperty("MethodType", "GET");
-
- if (busTopicParams.isUseHttps()) {
- props.setProperty(PROTOCOL_PROP, "https");
-
- } else {
- props.setProperty(PROTOCOL_PROP, "http");
- }
-
- props.setProperty("contenttype", "application/json");
-
- if (busTopicParams.isAdditionalPropsValid()) {
- props.putAll(busTopicParams.getAdditionalProps());
- }
- return props;
- }
- }
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusHelper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusHelper.java
deleted file mode 100644
index 298607b5..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusHelper.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP POLICY
- * ================================================================================
- * Copyright (C) 2023 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END============================================
- * ===================================================================
- *
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal;
-
-import java.util.Properties;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-
-public class BusHelper {
-
- private BusHelper() {
- /* no constructor */
- }
-
- /**
- * Complete the properties param with common fields for both BusConsumer and BusPublisher.
- * @param busTopicParams topics
- * @param dme2RouteOffer route
- * @param props properties
- */
- public static void setCommonProperties(BusTopicParams busTopicParams, String dme2RouteOffer, Properties props) {
- props.setProperty("Environment", busTopicParams.getEnvironment());
- props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
-
- if (busTopicParams.getPartner() != null) {
- props.setProperty("Partner", busTopicParams.getPartner());
- }
- if (dme2RouteOffer != null) {
- props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
- }
-
- props.setProperty("Latitude", busTopicParams.getLatitude());
- props.setProperty("Longitude", busTopicParams.getLongitude());
-
- /* These are optional, will default to these values if not set in additionalProps */
- props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
- props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
- props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
- props.setProperty("Version", "1.0");
- props.setProperty("SubContextPath", "/");
- props.setProperty("sessionstickinessrequired", "no");
-
- /* These should not change */
- props.setProperty("TransportType", "DME2");
- }
-
- /**
- * Throws exception when any of the checks are invalid.
- * @param busTopicParams topics
- * @param topicType topic type (sink or source)
- */
- public static void validateBusTopicParams(BusTopicParams busTopicParams, String topicType) {
- if (busTopicParams.isEnvironmentInvalid()) {
- throw paramException(busTopicParams.getTopic(), topicType,
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
- }
- if (busTopicParams.isAftEnvironmentInvalid()) {
- throw paramException(busTopicParams.getTopic(), topicType,
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
- }
- if (busTopicParams.isLatitudeInvalid()) {
- throw paramException(busTopicParams.getTopic(), topicType,
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
- }
- if (busTopicParams.isLongitudeInvalid()) {
- throw paramException(busTopicParams.getTopic(), topicType,
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
- }
- }
-
- private static IllegalArgumentException paramException(String topic, String topicType, String propertyName) {
- return new IllegalArgumentException("Missing " + topicType + "."
- + topic + propertyName + " property for DME2 in DMaaP");
-
- }
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
index def8f841..e2adde0d 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
@@ -5,7 +5,7 @@
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
* Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
- * Modifications Copyright (C) 2022-2023 Nordix Foundation.
+ * Modifications Copyright (C) 2022-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,21 +29,12 @@ import com.att.nsa.cambria.client.CambriaClientBuilders;
import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
import java.net.MalformedURLException;
import java.security.GeneralSecurityException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
import java.util.Properties;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
-import org.onap.dmaap.mr.client.response.MRPublisherResponse;
-import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -239,202 +230,4 @@ public interface BusPublisher {
}
}
-
- /**
- * DmaapClient library wrapper.
- */
- abstract class DmaapPublisherWrapper implements BusPublisher {
-
- private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
-
- /**
- * MR based Publisher.
- */
- protected MRSimplerBatchPublisher publisher;
- protected Properties props;
-
- /**
- * MR Publisher Wrapper.
- *
- * @param servers messaging bus hosts
- * @param topic topic
- * @param username AAF or DME2 Login
- * @param password AAF or DME2 Password
- */
- protected DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
- String username, String password, boolean useHttps) {
-
- if (StringUtils.isBlank(topic)) {
- throw new IllegalArgumentException("No topic for DMaaP");
- }
-
- configureProtocol(topic, protocol, servers, useHttps);
-
- this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
-
- this.publisher.setUsername(username);
- this.publisher.setPassword(password);
-
- props = new Properties();
-
- props.setProperty("Protocol", (useHttps ? "https" : "http"));
- props.setProperty("contenttype", "application/json");
- props.setProperty("username", username);
- props.setProperty("password", password);
-
- props.setProperty("topic", topic);
-
- this.publisher.setProps(props);
-
- if (protocol == ProtocolTypeConstants.AAF_AUTH) {
- this.publisher.setHost(servers.get(0));
- }
-
- logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
- }
-
- private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers,
- boolean useHttps) {
-
- if (protocol == ProtocolTypeConstants.AAF_AUTH) {
- if (servers == null || servers.isEmpty()) {
- throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
- }
-
- ArrayList<String> dmaapServers = new ArrayList<>();
- String port = useHttps ? ":3905" : ":3904";
- for (String server : servers) {
- dmaapServers.add(server + port);
- }
-
- this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
-
- this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
-
- } else if (protocol == ProtocolTypeConstants.DME2) {
- ArrayList<String> dmaapServers = new ArrayList<>();
- dmaapServers.add("0.0.0.0:3904");
-
- this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
-
- this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
-
- } else {
- throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
- }
- }
-
- @Override
- public void close() {
- logger.info(LOG_CLOSE, this);
-
- try {
- this.publisher.close(1, TimeUnit.SECONDS);
-
- } catch (InterruptedException e) {
- logger.warn(LOG_CLOSE_FAILED, this, e);
- Thread.currentThread().interrupt();
-
- } catch (Exception e) {
- logger.warn(LOG_CLOSE_FAILED, this, e);
- }
- }
-
- @Override
- public boolean send(String partitionId, String message) {
- if (message == null) {
- throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
- }
-
- this.publisher.setPubResponse(new MRPublisherResponse());
- this.publisher.send(partitionId, message);
- MRPublisherResponse response = this.publisher.sendBatchWithResponse();
- if (response != null) {
- logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(),
- response.getResponseMessage());
- }
-
- return true;
- }
-
- @Override
- public String toString() {
- return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
- + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
- + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
- + ", publisher.getUsername()=" + publisher.getUsername() + "]";
- }
- }
-
- /**
- * DmaapClient library wrapper.
- */
- class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
- /**
- * MR based Publisher.
- */
- public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
- boolean useHttps) {
-
- super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
- }
- }
-
- class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
-
- /**
- * Constructor.
- *
- * @param busTopicParams topic parameters
- */
- public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) {
-
- super(ProtocolTypeConstants.DME2, busTopicParams.getServers(), busTopicParams.getTopic(),
- busTopicParams.getUserName(), busTopicParams.getPassword(), busTopicParams.isUseHttps());
-
- String dme2RouteOffer = busTopicParams.isAdditionalPropsValid()
- ? busTopicParams.getAdditionalProps().get(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
- : null;
-
- validateParams(busTopicParams, dme2RouteOffer);
-
- String serviceName = busTopicParams.getServers().get(0);
-
- /* These are required, no defaults */
- props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
-
- BusHelper.setCommonProperties(busTopicParams, dme2RouteOffer, props);
-
- props.setProperty("MethodType", "POST");
-
- if (busTopicParams.isAdditionalPropsValid()) {
- addAdditionalProps(busTopicParams);
- }
-
- this.publisher.setProps(props);
- }
-
- private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) {
- BusHelper.validateBusTopicParams(busTopicParams, PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
-
- if ((busTopicParams.isPartnerInvalid()) && StringUtils.isBlank(dme2RouteOffer)) {
- throw new IllegalArgumentException("Must provide at least "
- + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
- + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
- }
- }
-
- private void addAdditionalProps(BusTopicParams busTopicParams) {
- for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
-
- if (value != null) {
- props.setProperty(key, value);
- }
- }
- }
- }
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
index 7cc8f8b6..53a6ab66 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
@@ -4,7 +4,7 @@
* ================================================================================
* Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved.
* Modifications Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2019, 2023 Nordix Foundation.
+ * Modifications Copyright (C) 2019, 2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,14 +33,14 @@ import org.apache.commons.lang3.StringUtils;
/**
* Member variables of this Params class are as follows.
*
- * <p>servers DMaaP servers
- * topic DMaaP Topic to be monitored
- * apiKey DMaaP API Key (optional)
- * apiSecret DMaaP API Secret (optional)
- * consumerGroup DMaaP Reader Consumer Group
- * consumerInstance DMaaP Reader Instance
- * fetchTimeout DMaaP fetch timeout
- * fetchLimit DMaaP fetch limit
+ * <p>servers Kafka servers
+ * topic Kafka Topic to be monitored
+ * apiKey Kafka API Key (optional)
+ * apiSecret Kafka API Secret (optional)
+ * consumerGroup kafka Reader Consumer Group
+ * consumerInstance Kafka Reader Instance
+ * fetchTimeout kafka fetch timeout
+ * fetchLimit Kafka fetch limit
* environment DME2 Environment
* aftEnvironment DME2 AFT Environment
* partner DME2 Partner
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
index 7c740abf..dfdc7b3c 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
@@ -5,7 +5,7 @@
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
* Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
- * Modifications Copyright (C) 2023 Nordix Foundation.
+ * Modifications Copyright (C) 2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
/**
* Transport Agnostic Bus Topic Sink to carry out the core functionality to interact with a sink
- * regardless if it is UEB or DMaaP.
+ * regardless if it is UEB or Kafka.
*
*/
public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java
deleted file mode 100644
index 771efb33..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
- * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal;
-
-import java.util.Map;
-import org.onap.policy.common.endpoints.event.comm.Topic;
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This implementation publishes events for the associated DMAAP topic, inline with the calling
- * thread.
- */
-public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTopicSink {
-
- protected static Logger logger = LoggerFactory.getLogger(InlineDmaapTopicSink.class);
-
- protected final String userName;
- protected final String password;
-
- protected String environment = null;
- protected String aftEnvironment = null;
- protected String partner = null;
- protected String latitude = null;
- protected String longitude = null;
-
- protected Map<String, String> additionalProps = null;
-
- /**
- * BusTopicParams contains the below mentioned attributes.
- * servers DMaaP servers
- * topic DMaaP Topic to be monitored
- * apiKey DMaaP API Key (optional)
- * apiSecret DMaaP API Secret (optional)
- * environment DME2 Environment
- * aftEnvironment DME2 AFT Environment
- * partner DME2 Partner
- * latitude DME2 Latitude
- * longitude DME2 Longitude
- * additionalProps Additional properties to pass to DME2
- * useHttps does connection use HTTPS?
- * allowTracing is tracing allowed?
- * allowSelfSignedCerts are self-signed certificates allow
- * @param busTopicParams Contains the above mentioned parameters
- * @throws IllegalArgumentException An invalid parameter passed in
- */
- public InlineDmaapTopicSink(BusTopicParams busTopicParams) {
-
- super(busTopicParams);
-
- this.userName = busTopicParams.getUserName();
- this.password = busTopicParams.getPassword();
-
- this.environment = busTopicParams.getEnvironment();
- this.aftEnvironment = busTopicParams.getAftEnvironment();
- this.partner = busTopicParams.getPartner();
-
- this.latitude = busTopicParams.getLatitude();
- this.longitude = busTopicParams.getLongitude();
-
- this.additionalProps = busTopicParams.getAdditionalProps();
- }
-
-
- @Override
- public void init() {
- if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
- this.publisher = new BusPublisher.CambriaPublisherWrapper(BusTopicParams.builder()
- .servers(this.servers)
- .topic(this.effectiveTopic)
- .apiKey(this.apiKey)
- .apiSecret(this.apiSecret)
- .userName(this.userName)
- .password(this.password)
- .useHttps(this.useHttps)
- .allowTracing(this.allowTracing)
- .allowSelfSignedCerts(this.allowSelfSignedCerts)
- .build());
- } else {
- this.publisher = new BusPublisher.DmaapDmePublisherWrapper(BusTopicParams.builder()
- .servers(this.servers)
- .topic(this.effectiveTopic)
- .userName(this.userName)
- .password(this.password)
- .environment(this.environment)
- .aftEnvironment(this.aftEnvironment)
- .partner(this.partner)
- .latitude(this.latitude)
- .longitude(this.longitude)
- .additionalProps(this.additionalProps)
- .useHttps(this.useHttps)
- .allowTracing(this.allowTracing)
- .build());
- }
-
- logger.info("{}: DMAAP SINK created", this);
- }
-
- @Override
- public CommInfrastructure getTopicCommInfrastructure() {
- return Topic.CommInfrastructure.DMAAP;
- }
-
-
- @Override
- public String toString() {
- return "InlineDmaapTopicSink [userName=" + userName + ", password=" + password
- + ", getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()=" + super.toString()
- + "]";
- }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
deleted file mode 100644
index 26960379..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal;
-
-import java.net.MalformedURLException;
-import java.util.Map;
-import org.onap.policy.common.endpoints.event.comm.Topic;
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This topic reader implementation specializes in reading messages over DMAAP topic and notifying
- * its listeners.
- */
-public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource implements DmaapTopicSource, Runnable {
-
- private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class);
-
-
- protected final String userName;
- protected final String password;
-
- protected String environment = null;
- protected String aftEnvironment = null;
- protected String partner = null;
- protected String latitude = null;
- protected String longitude = null;
-
- protected Map<String, String> additionalProps = null;
-
-
- /**
- * Constructor.
- *
- * @param busTopicParams Parameters object containing all the required inputs
- *
- * @throws IllegalArgumentException An invalid parameter passed in
- */
- public SingleThreadedDmaapTopicSource(BusTopicParams busTopicParams) {
-
- super(busTopicParams);
-
- this.userName = busTopicParams.getUserName();
- this.password = busTopicParams.getPassword();
-
- this.environment = busTopicParams.getEnvironment();
- this.aftEnvironment = busTopicParams.getAftEnvironment();
- this.partner = busTopicParams.getPartner();
-
- this.latitude = busTopicParams.getLatitude();
- this.longitude = busTopicParams.getLongitude();
-
- this.additionalProps = busTopicParams.getAdditionalProps();
- try {
- this.init();
- } catch (Exception e) {
- throw new IllegalArgumentException("ERROR during init in dmaap-source: cannot create topic " + topic, e);
- }
- }
-
-
- /**
- * Initialize the Cambria or MR Client.
- */
- @Override
- public void init() throws MalformedURLException {
- BusTopicParams.TopicParamsBuilder builder = BusTopicParams.builder()
- .servers(this.servers)
- .topic(this.effectiveTopic)
- .apiKey(this.apiKey)
- .apiSecret(this.apiSecret)
- .consumerGroup(this.consumerGroup)
- .consumerInstance(this.consumerInstance)
- .fetchTimeout(this.fetchTimeout)
- .fetchLimit(this.fetchLimit)
- .useHttps(this.useHttps)
- .allowTracing(this.allowTracing);
-
- if (anyNullOrEmpty(this.userName, this.password)) {
- this.consumer = new BusConsumer.CambriaConsumerWrapper(builder
- .allowSelfSignedCerts(this.allowSelfSignedCerts)
- .build());
- } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
- this.consumer = new BusConsumer.CambriaConsumerWrapper(builder
- .userName(this.userName)
- .password(this.password)
- .allowSelfSignedCerts(this.allowSelfSignedCerts)
- .build());
- } else {
- this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(builder
- .userName(this.userName)
- .password(this.password)
- .environment(this.environment)
- .aftEnvironment(this.aftEnvironment)
- .partner(this.partner)
- .latitude(this.latitude)
- .longitude(this.longitude)
- .additionalProps(this.additionalProps)
- .build());
- }
-
- logger.info("{}: INITTED", this);
- }
-
- @Override
- public CommInfrastructure getTopicCommInfrastructure() {
- return Topic.CommInfrastructure.DMAAP;
- }
-
- @Override
- public String toString() {
- return "SingleThreadedDmaapTopicSource [userName=" + userName
- + ", password=" + (password == null || password.isEmpty() ? "-" : password.length())
- + ", getTopicCommInfrastructure()=" + getTopicCommInfrastructure()
- + ", toString()=" + super.toString() + "]";
- }
-
-
-}