aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus
diff options
context:
space:
mode:
authoradheli.tavares <adheli.tavares@est.tech>2023-09-28 14:25:43 +0100
committeradheli.tavares <adheli.tavares@est.tech>2023-09-29 10:30:58 +0100
commitcf36274c5ae0bc569ec7ebe2cb4e8f579763cc14 (patch)
treec9a9403714185944ca9ad0f93cd1478072b748b2 /policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus
parent349b4ae7179173f9261d9a432094cb55dc433820 (diff)
Fix security vulnerabilities
- iq nexus vulnerabilities - sonar security hotspots and code smell Issue-ID: POLICY-4761 Issue-ID: POLICY-4833 Change-Id: Iab2e07d2ee7b90031bc5a30210ce7d3f5a47b3fd Signed-off-by: adheli.tavares <adheli.tavares@est.tech>
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java238
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusHelper.java95
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java90
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java3
4 files changed, 215 insertions, 211 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
index 8542d572..79e374a2 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
@@ -5,7 +5,7 @@
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
* Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
- * Copyright (C) 2022 Nordix Foundation.
+ * Modifications Copyright (C) 2022-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -31,7 +31,6 @@ import java.net.MalformedURLException;
import java.security.GeneralSecurityException;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -46,6 +45,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.jetbrains.annotations.NotNull;
import org.onap.dmaap.mr.client.MRClientFactory;
import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
@@ -76,8 +76,8 @@ public interface BusConsumer {
/**
* Consumer that handles fetch() failures by sleeping.
*/
- public abstract static class FetchingBusConsumer implements BusConsumer {
- private static Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
+ abstract class FetchingBusConsumer implements BusConsumer {
+ private static final Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
/**
* Fetch timeout.
@@ -158,18 +158,16 @@ public interface BusConsumer {
/**
* Cambria Consumer Wrapper.
* BusTopicParam object contains the following parameters
- * servers messaging bus hosts.
- * topic topic
- * apiKey API Key
- * apiSecret API Secret
- * consumerGroup Consumer Group
- * consumerInstance Consumer Instance
- * fetchTimeout Fetch Timeout
- * fetchLimit Fetch Limit
+ * servers - messaging bus hosts.
+ * topic - topic for messages
+ * apiKey - API Key
+ * apiSecret - API Secret
+ * consumerGroup - Consumer Group
+ * consumerInstance - Consumer Instance
+ * fetchTimeout - Fetch Timeout
+ * fetchLimit - Fetch Limit
*
* @param busTopicParams - The parameters for the bus topic
- * @throws GeneralSecurityException - Security exception
- * @throws MalformedURLException - Malformed URL exception
*/
public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
super(busTopicParams);
@@ -177,8 +175,8 @@ public interface BusConsumer {
this.builder = new CambriaClientBuilders.ConsumerBuilder();
builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
- .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
- .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
+ .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
+ .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
// Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
builder.withSocketTimeout(fetchTimeout + 30000);
@@ -232,12 +230,12 @@ public interface BusConsumer {
/**
* Kafka based consumer.
*/
- public static class KafkaConsumerWrapper extends FetchingBusConsumer {
+ class KafkaConsumerWrapper extends FetchingBusConsumer {
/**
* logger.
*/
- private static Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
+ private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
@@ -249,15 +247,13 @@ public interface BusConsumer {
/**
* Kafka Consumer Wrapper.
- * BusTopicParam object contains the following parameters
- * servers messaging bus hosts.
- * topic topic
+ * BusTopicParam - object contains the following parameters
+ * servers - messaging bus hosts.
+ * topic - topic
*
* @param busTopicParams - The parameters for the bus topic
- * @throws GeneralSecurityException - Security exception
- * @throws MalformedURLException - Malformed URL exception
*/
- public KafkaConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
+ public KafkaConsumerWrapper(BusTopicParams busTopicParams) {
super(busTopicParams);
if (busTopicParams.isTopicInvalid()) {
@@ -267,12 +263,10 @@ public interface BusConsumer {
//Setup Properties for consumer
kafkaProps = new Properties();
kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- busTopicParams.getServers().get(0));
+ busTopicParams.getServers().get(0));
if (busTopicParams.isAdditionalPropsValid()) {
- for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
- kafkaProps.put(entry.getKey(), entry.getValue());
- }
+ kafkaProps.putAll(busTopicParams.getAdditionalProps());
}
if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) {
@@ -286,11 +280,11 @@ public interface BusConsumer {
}
consumer = new KafkaConsumer<>(kafkaProps);
//Subscribe to the topic
- consumer.subscribe(Arrays.asList(busTopicParams.getTopic()));
+ consumer.subscribe(List.of(busTopicParams.getTopic()));
}
@Override
- public Iterable<String> fetch() throws IOException {
+ public Iterable<String> fetch() {
ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout));
if (records == null || records.count() <= 0) {
return Collections.emptyList();
@@ -306,7 +300,7 @@ public interface BusConsumer {
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
} catch (Exception e) {
- logger.error("{}: cannot fetch because of {}", this, e.getMessage());
+ logger.error("{}: cannot fetch, throwing exception after sleep...", this);
sleepAfterFetchFailure();
throw e;
}
@@ -334,7 +328,7 @@ public interface BusConsumer {
/**
* logger.
*/
- private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
+ private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
/**
* Name of the "protocol" property.
@@ -349,16 +343,16 @@ public interface BusConsumer {
/**
* MR Consumer Wrapper.
*
- * <p>servers messaging bus hosts
- * topic topic
- * apiKey API Key
- * apiSecret API Secret
- * username AAF Login
- * password AAF Password
- * consumerGroup Consumer Group
- * consumerInstance Consumer Instance
- * fetchTimeout Fetch Timeout
- * fetchLimit Fetch Limit
+ * <p>servers - messaging bus hosts
+ * topic - topic
+ * apiKey - API Key
+ * apiSecret - API Secret
+ * username - AAF Login
+ * password - AAF Password
+ * consumerGroup - Consumer Group
+ * consumerInstance - Consumer Instance
+ * fetchTimeout - Fetch Timeout
+ * fetchLimit - Fetch Limit
*
* @param busTopicParams contains above listed attributes
* @throws MalformedURLException URL should be valid
@@ -371,22 +365,22 @@ public interface BusConsumer {
}
this.consumer = new MRConsumerImplBuilder()
- .setHostPart(busTopicParams.getServers())
- .setTopic(busTopicParams.getTopic())
- .setConsumerGroup(busTopicParams.getConsumerGroup())
- .setConsumerId(busTopicParams.getConsumerInstance())
- .setTimeoutMs(busTopicParams.getFetchTimeout())
- .setLimit(busTopicParams.getFetchLimit())
- .setApiKey(busTopicParams.getApiKey())
- .setApiSecret(busTopicParams.getApiSecret())
- .createMRConsumerImpl();
+ .setHostPart(busTopicParams.getServers())
+ .setTopic(busTopicParams.getTopic())
+ .setConsumerGroup(busTopicParams.getConsumerGroup())
+ .setConsumerId(busTopicParams.getConsumerInstance())
+ .setTimeoutMs(busTopicParams.getFetchTimeout())
+ .setLimit(busTopicParams.getFetchLimit())
+ .setApiKey(busTopicParams.getApiKey())
+ .setApiSecret(busTopicParams.getApiSecret())
+ .createMRConsumerImpl();
this.consumer.setUsername(busTopicParams.getUserName());
this.consumer.setPassword(busTopicParams.getPassword());
}
@Override
- public Iterable<String> fetch() throws IOException {
+ public Iterable<String> fetch() {
final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
if (response == null) {
logger.warn("{}: DMaaP NULL response received", this);
@@ -395,12 +389,12 @@ public interface BusConsumer {
return new ArrayList<>();
} else {
logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
- response.getResponseMessage());
+ response.getResponseMessage());
if (!"200".equals(response.getResponseCode())) {
logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
- response.getResponseMessage());
+ response.getResponseMessage());
sleepAfterFetchFailure();
@@ -424,35 +418,33 @@ public interface BusConsumer {
@Override
public String toString() {
return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
- + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
- + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
- + consumer.getUsername() + "]";
+ + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
+ + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
+ + consumer.getUsername() + "]";
}
}
/**
* MR based consumer.
*/
- public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
+ class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
- private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
-
- private final Properties props;
+ private static final Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
/**
* BusTopicParams contain the following parameters.
* MR Consumer Wrapper.
*
* <p>servers messaging bus hosts
- * topic topic
- * apiKey API Key
- * apiSecret API Secret
- * aafLogin AAF Login
- * aafPassword AAF Password
- * consumerGroup Consumer Group
- * consumerInstance Consumer Instance
- * fetchTimeout Fetch Timeout
- * fetchLimit Fetch Limit
+ * topic - topic
+ * apiKey - API Key
+ * apiSecret - API Secret
+ * aafLogin - AAF Login
+ * aafPassword - AAF Password
+ * consumerGroup - Consumer Group
+ * consumerInstance - Consumer Instance
+ * fetchTimeout - Fetch Timeout
+ * fetchLimit - Fetch Limit
*
* @param busTopicParams contains above listed params
* @throws MalformedURLException URL should be valid
@@ -468,7 +460,7 @@ public interface BusConsumer {
this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
- props = new Properties();
+ Properties props = new Properties();
if (busTopicParams.isUseHttps()) {
props.setProperty(PROTOCOL_PROP, "https");
@@ -488,23 +480,20 @@ public interface BusConsumer {
final MRConsumerImpl consumer = this.consumer;
return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
- + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
- + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
- + consumer.getUsername() + "]";
+ + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
+ + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
+ + consumer.getUsername() + "]";
}
}
- public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
-
- private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
+ class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
- private final Properties props;
+ private static final Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
/**
* Constructor.
*
- * @param busTopicParams topic paramters
- *
+ * @param busTopicParams topic parameters
* @throws MalformedURLException must provide a valid URL
*/
public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
@@ -514,36 +503,21 @@ public interface BusConsumer {
final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
- ? busTopicParams.getAdditionalProps().get(
- PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
- : null);
+ ? busTopicParams.getAdditionalProps().get(
+ PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
+ : null);
- if (busTopicParams.isEnvironmentInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
- }
- if (busTopicParams.isAftEnvironmentInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
- }
- if (busTopicParams.isLatitudeInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
- }
- if (busTopicParams.isLongitudeInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
- }
+ BusHelper.validateBusTopicParams(busTopicParams, PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
if ((busTopicParams.isPartnerInvalid())
- && StringUtils.isBlank(dme2RouteOffer)) {
+ && StringUtils.isBlank(dme2RouteOffer)) {
throw new IllegalArgumentException(
- "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
- + "." + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
- + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
+ "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
+ + "." + busTopicParams.getTopic()
+ + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
+ + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+ + busTopicParams.getTopic()
+ + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
}
final String serviceName = busTopicParams.getServers().get(0);
@@ -553,7 +527,18 @@ public interface BusConsumer {
this.consumer.setUsername(busTopicParams.getUserName());
this.consumer.setPassword(busTopicParams.getPassword());
- props = new Properties();
+ Properties props = getProperties(busTopicParams, serviceName, dme2RouteOffer);
+
+ MRClientFactory.prop = props;
+ this.consumer.setProps(props);
+
+ logger.info("{}: CREATION", this);
+ }
+
+ @NotNull
+ private static Properties getProperties(BusTopicParams busTopicParams, String serviceName,
+ String dme2RouteOffer) {
+ Properties props = new Properties();
props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
@@ -563,29 +548,8 @@ public interface BusConsumer {
/* These are required, no defaults */
props.setProperty("topic", busTopicParams.getTopic());
- props.setProperty("Environment", busTopicParams.getEnvironment());
- props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
+ BusHelper.setCommonProperties(busTopicParams, dme2RouteOffer, props);
- if (busTopicParams.getPartner() != null) {
- props.setProperty("Partner", busTopicParams.getPartner());
- }
- if (dme2RouteOffer != null) {
- props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
- }
-
- props.setProperty("Latitude", busTopicParams.getLatitude());
- props.setProperty("Longitude", busTopicParams.getLongitude());
-
- /* These are optional, will default to these values if not set in additionalProps */
- props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
- props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
- props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
- props.setProperty("Version", "1.0");
- props.setProperty("SubContextPath", "/");
- props.setProperty("sessionstickinessrequired", "no");
-
- /* These should not change */
- props.setProperty("TransportType", "DME2");
props.setProperty("MethodType", "GET");
if (busTopicParams.isUseHttps()) {
@@ -598,21 +562,9 @@ public interface BusConsumer {
props.setProperty("contenttype", "application/json");
if (busTopicParams.isAdditionalPropsValid()) {
- for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
- props.put(entry.getKey(), entry.getValue());
- }
+ props.putAll(busTopicParams.getAdditionalProps());
}
-
- MRClientFactory.prop = props;
- this.consumer.setProps(props);
-
- logger.info("{}: CREATION", this);
- }
-
- private IllegalArgumentException parmException(String topic, String propnm) {
- return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + topic + propnm + " property for DME2 in DMaaP");
-
+ return props;
}
}
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusHelper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusHelper.java
new file mode 100644
index 00000000..298607b5
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusHelper.java
@@ -0,0 +1,95 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP POLICY
+ * ================================================================================
+ * Copyright (C) 2023 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END============================================
+ * ===================================================================
+ *
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus.internal;
+
+import java.util.Properties;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+
+public class BusHelper {
+
+ private BusHelper() {
+ /* no constructor */
+ }
+
+ /**
+ * Complete the properties param with common fields for both BusConsumer and BusPublisher.
+ * @param busTopicParams topics
+ * @param dme2RouteOffer route
+ * @param props properties
+ */
+ public static void setCommonProperties(BusTopicParams busTopicParams, String dme2RouteOffer, Properties props) {
+ props.setProperty("Environment", busTopicParams.getEnvironment());
+ props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
+
+ if (busTopicParams.getPartner() != null) {
+ props.setProperty("Partner", busTopicParams.getPartner());
+ }
+ if (dme2RouteOffer != null) {
+ props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
+ }
+
+ props.setProperty("Latitude", busTopicParams.getLatitude());
+ props.setProperty("Longitude", busTopicParams.getLongitude());
+
+ /* These are optional, will default to these values if not set in additionalProps */
+ props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
+ props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
+ props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
+ props.setProperty("Version", "1.0");
+ props.setProperty("SubContextPath", "/");
+ props.setProperty("sessionstickinessrequired", "no");
+
+ /* These should not change */
+ props.setProperty("TransportType", "DME2");
+ }
+
+ /**
+ * Throws exception when any of the checks are invalid.
+ * @param busTopicParams topics
+ * @param topicType topic type (sink or source)
+ */
+ public static void validateBusTopicParams(BusTopicParams busTopicParams, String topicType) {
+ if (busTopicParams.isEnvironmentInvalid()) {
+ throw paramException(busTopicParams.getTopic(), topicType,
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
+ }
+ if (busTopicParams.isAftEnvironmentInvalid()) {
+ throw paramException(busTopicParams.getTopic(), topicType,
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
+ }
+ if (busTopicParams.isLatitudeInvalid()) {
+ throw paramException(busTopicParams.getTopic(), topicType,
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
+ }
+ if (busTopicParams.isLongitudeInvalid()) {
+ throw paramException(busTopicParams.getTopic(), topicType,
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
+ }
+ }
+
+ private static IllegalArgumentException paramException(String topic, String topicType, String propertyName) {
+ return new IllegalArgumentException("Missing " + topicType + "."
+ + topic + propertyName + " property for DME2 in DMaaP");
+
+ }
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
index 92f7bc6f..ef8e1742 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
@@ -49,7 +49,9 @@ import org.slf4j.LoggerFactory;
public interface BusPublisher {
- public static final String NO_MESSAGE_PROVIDED = "No message provided";
+ String NO_MESSAGE_PROVIDED = "No message provided";
+ String LOG_CLOSE = "{}: CLOSE";
+ String LOG_CLOSE_FAILED = "{}: CLOSE FAILED";
/**
* sends a message.
@@ -59,19 +61,19 @@ public interface BusPublisher {
* @return true if success, false otherwise
* @throws IllegalArgumentException if no message provided
*/
- public boolean send(String partitionId, String message);
+ boolean send(String partitionId, String message);
/**
* closes the publisher.
*/
- public void close();
+ void close();
/**
* Cambria based library publisher.
*/
- public static class CambriaPublisherWrapper implements BusPublisher {
+ class CambriaPublisherWrapper implements BusPublisher {
- private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
+ private static final Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
/**
* The actual Cambria publisher.
@@ -133,7 +135,7 @@ public interface BusPublisher {
@Override
public void close() {
- logger.info("{}: CLOSE", this);
+ logger.info(LOG_CLOSE, this);
try {
this.publisher.close();
@@ -152,17 +154,17 @@ public interface BusPublisher {
/**
* Kafka based library publisher.
*/
- public static class KafkaPublisherWrapper implements BusPublisher {
+ class KafkaPublisherWrapper implements BusPublisher {
- private static Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
+ private static final Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
- private String topic;
+ private final String topic;
/**
* Kafka publisher.
*/
- private Producer<String, String> producer;
+ private final Producer<String, String> producer;
protected Properties kafkaProps;
/**
@@ -182,9 +184,7 @@ public interface BusPublisher {
kafkaProps = new Properties();
kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0));
if (busTopicParams.isAdditionalPropsValid()) {
- for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
- kafkaProps.put(entry.getKey(), entry.getValue());
- }
+ kafkaProps.putAll(busTopicParams.getAdditionalProps());
}
if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) {
kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
@@ -218,7 +218,7 @@ public interface BusPublisher {
@Override
public void close() {
- logger.info("{}: CLOSE", this);
+ logger.info(LOG_CLOSE, this);
try {
this.producer.close();
@@ -237,9 +237,9 @@ public interface BusPublisher {
/**
* DmaapClient library wrapper.
*/
- public abstract class DmaapPublisherWrapper implements BusPublisher {
+ abstract class DmaapPublisherWrapper implements BusPublisher {
- private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
+ private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
/**
* MR based Publisher.
@@ -320,17 +320,17 @@ public interface BusPublisher {
@Override
public void close() {
- logger.info("{}: CLOSE", this);
+ logger.info(LOG_CLOSE, this);
try {
this.publisher.close(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- logger.warn("{}: CLOSE FAILED", this, e);
+ logger.warn(LOG_CLOSE_FAILED, this, e);
Thread.currentThread().interrupt();
} catch (Exception e) {
- logger.warn("{}: CLOSE FAILED", this, e);
+ logger.warn(LOG_CLOSE_FAILED, this, e);
}
}
@@ -363,7 +363,7 @@ public interface BusPublisher {
/**
* DmaapClient library wrapper.
*/
- public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
+ class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
/**
* MR based Publisher.
*/
@@ -374,7 +374,7 @@ public interface BusPublisher {
}
}
- public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
+ class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
/**
* Constructor.
@@ -395,33 +395,10 @@ public interface BusPublisher {
String serviceName = busTopicParams.getServers().get(0);
/* These are required, no defaults */
- props.setProperty("Environment", busTopicParams.getEnvironment());
- props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
-
props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
- if (busTopicParams.getPartner() != null) {
- props.setProperty("Partner", busTopicParams.getPartner());
- }
- if (dme2RouteOffer != null) {
- props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
- }
-
- props.setProperty("Latitude", busTopicParams.getLatitude());
- props.setProperty("Longitude", busTopicParams.getLongitude());
-
- // ServiceName also a default, found in additionalProps
-
- /* These are optional, will default to these values if not set in optionalProps */
- props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
- props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
- props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
- props.setProperty("Version", "1.0");
- props.setProperty("SubContextPath", "/");
- props.setProperty("sessionstickinessrequired", "no");
+ BusHelper.setCommonProperties(busTopicParams, dme2RouteOffer, props);
- /* These should not change */
- props.setProperty("TransportType", "DME2");
props.setProperty("MethodType", "POST");
if (busTopicParams.isAdditionalPropsValid()) {
@@ -432,22 +409,7 @@ public interface BusPublisher {
}
private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) {
- if (busTopicParams.isEnvironmentInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
- }
- if (busTopicParams.isAftEnvironmentInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
- }
- if (busTopicParams.isLatitudeInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
- }
- if (busTopicParams.isLongitudeInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
- }
+ BusHelper.validateBusTopicParams(busTopicParams, PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
if ((busTopicParams.isPartnerInvalid()) && StringUtils.isBlank(dme2RouteOffer)) {
throw new IllegalArgumentException("Must provide at least "
@@ -468,11 +430,5 @@ public interface BusPublisher {
}
}
}
-
- private IllegalArgumentException parmException(String topic, String propnm) {
- return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
- + topic + propnm + " property for DME2 in DMaaP");
-
- }
}
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java
index daeaea13..3372e0aa 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java
@@ -4,6 +4,7 @@
* ================================================================================
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -37,7 +38,7 @@ public abstract class TopicBase implements Topic {
/**
* Logger.
*/
- private static Logger logger = LoggerFactory.getLogger(TopicBase.class);
+ private static final Logger logger = LoggerFactory.getLogger(TopicBase.class);
/**
* List of servers.