aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
diff options
context:
space:
mode:
authoradheli.tavares <adheli.tavares@est.tech>2023-09-28 14:25:43 +0100
committeradheli.tavares <adheli.tavares@est.tech>2023-09-29 10:30:58 +0100
commitcf36274c5ae0bc569ec7ebe2cb4e8f579763cc14 (patch)
treec9a9403714185944ca9ad0f93cd1478072b748b2 /policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
parent349b4ae7179173f9261d9a432094cb55dc433820 (diff)
Fix security vulnerabilities
- iq nexus vulnerabilities - sonar security hotspots and code smell Issue-ID: POLICY-4761 Issue-ID: POLICY-4833 Change-Id: Iab2e07d2ee7b90031bc5a30210ce7d3f5a47b3fd Signed-off-by: adheli.tavares <adheli.tavares@est.tech>
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java238
1 files changed, 95 insertions, 143 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
index 8542d572..79e374a2 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
@@ -5,7 +5,7 @@
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
* Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
- * Copyright (C) 2022 Nordix Foundation.
+ * Modifications Copyright (C) 2022-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -31,7 +31,6 @@ import java.net.MalformedURLException;
import java.security.GeneralSecurityException;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -46,6 +45,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.jetbrains.annotations.NotNull;
import org.onap.dmaap.mr.client.MRClientFactory;
import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
@@ -76,8 +76,8 @@ public interface BusConsumer {
/**
* Consumer that handles fetch() failures by sleeping.
*/
- public abstract static class FetchingBusConsumer implements BusConsumer {
- private static Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
+ abstract class FetchingBusConsumer implements BusConsumer {
+ private static final Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
/**
* Fetch timeout.
@@ -158,18 +158,16 @@ public interface BusConsumer {
/**
* Cambria Consumer Wrapper.
* BusTopicParam object contains the following parameters
- * servers messaging bus hosts.
- * topic topic
- * apiKey API Key
- * apiSecret API Secret
- * consumerGroup Consumer Group
- * consumerInstance Consumer Instance
- * fetchTimeout Fetch Timeout
- * fetchLimit Fetch Limit
+ * servers - messaging bus hosts.
+ * topic - topic for messages
+ * apiKey - API Key
+ * apiSecret - API Secret
+ * consumerGroup - Consumer Group
+ * consumerInstance - Consumer Instance
+ * fetchTimeout - Fetch Timeout
+ * fetchLimit - Fetch Limit
*
* @param busTopicParams - The parameters for the bus topic
- * @throws GeneralSecurityException - Security exception
- * @throws MalformedURLException - Malformed URL exception
*/
public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
super(busTopicParams);
@@ -177,8 +175,8 @@ public interface BusConsumer {
this.builder = new CambriaClientBuilders.ConsumerBuilder();
builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
- .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
- .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
+ .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
+ .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
// Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
builder.withSocketTimeout(fetchTimeout + 30000);
@@ -232,12 +230,12 @@ public interface BusConsumer {
/**
* Kafka based consumer.
*/
- public static class KafkaConsumerWrapper extends FetchingBusConsumer {
+ class KafkaConsumerWrapper extends FetchingBusConsumer {
/**
* logger.
*/
- private static Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
+ private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
@@ -249,15 +247,13 @@ public interface BusConsumer {
/**
* Kafka Consumer Wrapper.
- * BusTopicParam object contains the following parameters
- * servers messaging bus hosts.
- * topic topic
+ * BusTopicParam - object contains the following parameters
+ * servers - messaging bus hosts.
+ * topic - topic
*
* @param busTopicParams - The parameters for the bus topic
- * @throws GeneralSecurityException - Security exception
- * @throws MalformedURLException - Malformed URL exception
*/
- public KafkaConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
+ public KafkaConsumerWrapper(BusTopicParams busTopicParams) {
super(busTopicParams);
if (busTopicParams.isTopicInvalid()) {
@@ -267,12 +263,10 @@ public interface BusConsumer {
//Setup Properties for consumer
kafkaProps = new Properties();
kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- busTopicParams.getServers().get(0));
+ busTopicParams.getServers().get(0));
if (busTopicParams.isAdditionalPropsValid()) {
- for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
- kafkaProps.put(entry.getKey(), entry.getValue());
- }
+ kafkaProps.putAll(busTopicParams.getAdditionalProps());
}
if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) {
@@ -286,11 +280,11 @@ public interface BusConsumer {
}
consumer = new KafkaConsumer<>(kafkaProps);
//Subscribe to the topic
- consumer.subscribe(Arrays.asList(busTopicParams.getTopic()));
+ consumer.subscribe(List.of(busTopicParams.getTopic()));
}
@Override
- public Iterable<String> fetch() throws IOException {
+ public Iterable<String> fetch() {
ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout));
if (records == null || records.count() <= 0) {
return Collections.emptyList();
@@ -306,7 +300,7 @@ public interface BusConsumer {
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
} catch (Exception e) {
- logger.error("{}: cannot fetch because of {}", this, e.getMessage());
+ logger.error("{}: cannot fetch, throwing exception after sleep...", this);
sleepAfterFetchFailure();
throw e;
}
@@ -334,7 +328,7 @@ public interface BusConsumer {
/**
* logger.
*/
- private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
+ private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
/**
* Name of the "protocol" property.
@@ -349,16 +343,16 @@ public interface BusConsumer {
/**
* MR Consumer Wrapper.
*
- * <p>servers messaging bus hosts
- * topic topic
- * apiKey API Key
- * apiSecret API Secret
- * username AAF Login
- * password AAF Password
- * consumerGroup Consumer Group
- * consumerInstance Consumer Instance
- * fetchTimeout Fetch Timeout
- * fetchLimit Fetch Limit
+ * <p>servers - messaging bus hosts
+ * topic - topic
+ * apiKey - API Key
+ * apiSecret - API Secret
+ * username - AAF Login
+ * password - AAF Password
+ * consumerGroup - Consumer Group
+ * consumerInstance - Consumer Instance
+ * fetchTimeout - Fetch Timeout
+ * fetchLimit - Fetch Limit
*
* @param busTopicParams contains above listed attributes
* @throws MalformedURLException URL should be valid
@@ -371,22 +365,22 @@ public interface BusConsumer {
}
this.consumer = new MRConsumerImplBuilder()
- .setHostPart(busTopicParams.getServers())
- .setTopic(busTopicParams.getTopic())
- .setConsumerGroup(busTopicParams.getConsumerGroup())
- .setConsumerId(busTopicParams.getConsumerInstance())
- .setTimeoutMs(busTopicParams.getFetchTimeout())
- .setLimit(busTopicParams.getFetchLimit())
- .setApiKey(busTopicParams.getApiKey())
- .setApiSecret(busTopicParams.getApiSecret())
- .createMRConsumerImpl();
+ .setHostPart(busTopicParams.getServers())
+ .setTopic(busTopicParams.getTopic())
+ .setConsumerGroup(busTopicParams.getConsumerGroup())
+ .setConsumerId(busTopicParams.getConsumerInstance())
+ .setTimeoutMs(busTopicParams.getFetchTimeout())
+ .setLimit(busTopicParams.getFetchLimit())
+ .setApiKey(busTopicParams.getApiKey())
+ .setApiSecret(busTopicParams.getApiSecret())
+ .createMRConsumerImpl();
this.consumer.setUsername(busTopicParams.getUserName());
this.consumer.setPassword(busTopicParams.getPassword());
}
@Override
- public Iterable<String> fetch() throws IOException {
+ public Iterable<String> fetch() {
final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
if (response == null) {
logger.warn("{}: DMaaP NULL response received", this);
@@ -395,12 +389,12 @@ public interface BusConsumer {
return new ArrayList<>();
} else {
logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
- response.getResponseMessage());
+ response.getResponseMessage());
if (!"200".equals(response.getResponseCode())) {
logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
- response.getResponseMessage());
+ response.getResponseMessage());
sleepAfterFetchFailure();
@@ -424,35 +418,33 @@ public interface BusConsumer {
@Override
public String toString() {
return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
- + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
- + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
- + consumer.getUsername() + "]";
+ + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
+ + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
+ + consumer.getUsername() + "]";
}
}
/**
* MR based consumer.
*/
- public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
+ class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
- private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
-
- private final Properties props;
+ private static final Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
/**
* BusTopicParams contain the following parameters.
* MR Consumer Wrapper.
*
* <p>servers messaging bus hosts
- * topic topic
- * apiKey API Key
- * apiSecret API Secret
- * aafLogin AAF Login
- * aafPassword AAF Password
- * consumerGroup Consumer Group
- * consumerInstance Consumer Instance
- * fetchTimeout Fetch Timeout
- * fetchLimit Fetch Limit
+ * topic - topic
+ * apiKey - API Key
+ * apiSecret - API Secret
+ * aafLogin - AAF Login
+ * aafPassword - AAF Password
+ * consumerGroup - Consumer Group
+ * consumerInstance - Consumer Instance
+ * fetchTimeout - Fetch Timeout
+ * fetchLimit - Fetch Limit
*
* @param busTopicParams contains above listed params
* @throws MalformedURLException URL should be valid
@@ -468,7 +460,7 @@ public interface BusConsumer {
this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
- props = new Properties();
+ Properties props = new Properties();
if (busTopicParams.isUseHttps()) {
props.setProperty(PROTOCOL_PROP, "https");
@@ -488,23 +480,20 @@ public interface BusConsumer {
final MRConsumerImpl consumer = this.consumer;
return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
- + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
- + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
- + consumer.getUsername() + "]";
+ + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
+ + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
+ + consumer.getUsername() + "]";
}
}
- public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
-
- private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
+ class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
- private final Properties props;
+ private static final Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
/**
* Constructor.
*
- * @param busTopicParams topic paramters
- *
+ * @param busTopicParams topic parameters
* @throws MalformedURLException must provide a valid URL
*/
public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
@@ -514,36 +503,21 @@ public interface BusConsumer {
final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
- ? busTopicParams.getAdditionalProps().get(
- PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
- : null);
+ ? busTopicParams.getAdditionalProps().get(
+ PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
+ : null);
- if (busTopicParams.isEnvironmentInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
- }
- if (busTopicParams.isAftEnvironmentInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
- }
- if (busTopicParams.isLatitudeInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
- }
- if (busTopicParams.isLongitudeInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
- }
+ BusHelper.validateBusTopicParams(busTopicParams, PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
if ((busTopicParams.isPartnerInvalid())
- && StringUtils.isBlank(dme2RouteOffer)) {
+ && StringUtils.isBlank(dme2RouteOffer)) {
throw new IllegalArgumentException(
- "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
- + "." + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
- + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
+ "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
+ + "." + busTopicParams.getTopic()
+ + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
+ + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+ + busTopicParams.getTopic()
+ + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
}
final String serviceName = busTopicParams.getServers().get(0);
@@ -553,7 +527,18 @@ public interface BusConsumer {
this.consumer.setUsername(busTopicParams.getUserName());
this.consumer.setPassword(busTopicParams.getPassword());
- props = new Properties();
+ Properties props = getProperties(busTopicParams, serviceName, dme2RouteOffer);
+
+ MRClientFactory.prop = props;
+ this.consumer.setProps(props);
+
+ logger.info("{}: CREATION", this);
+ }
+
+ @NotNull
+ private static Properties getProperties(BusTopicParams busTopicParams, String serviceName,
+ String dme2RouteOffer) {
+ Properties props = new Properties();
props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
@@ -563,29 +548,8 @@ public interface BusConsumer {
/* These are required, no defaults */
props.setProperty("topic", busTopicParams.getTopic());
- props.setProperty("Environment", busTopicParams.getEnvironment());
- props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
+ BusHelper.setCommonProperties(busTopicParams, dme2RouteOffer, props);
- if (busTopicParams.getPartner() != null) {
- props.setProperty("Partner", busTopicParams.getPartner());
- }
- if (dme2RouteOffer != null) {
- props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
- }
-
- props.setProperty("Latitude", busTopicParams.getLatitude());
- props.setProperty("Longitude", busTopicParams.getLongitude());
-
- /* These are optional, will default to these values if not set in additionalProps */
- props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
- props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
- props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
- props.setProperty("Version", "1.0");
- props.setProperty("SubContextPath", "/");
- props.setProperty("sessionstickinessrequired", "no");
-
- /* These should not change */
- props.setProperty("TransportType", "DME2");
props.setProperty("MethodType", "GET");
if (busTopicParams.isUseHttps()) {
@@ -598,21 +562,9 @@ public interface BusConsumer {
props.setProperty("contenttype", "application/json");
if (busTopicParams.isAdditionalPropsValid()) {
- for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
- props.put(entry.getKey(), entry.getValue());
- }
+ props.putAll(busTopicParams.getAdditionalProps());
}
-
- MRClientFactory.prop = props;
- this.consumer.setProps(props);
-
- logger.info("{}: CREATION", this);
- }
-
- private IllegalArgumentException parmException(String topic, String propnm) {
- return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + topic + propnm + " property for DME2 in DMaaP");
-
+ return props;
}
}
}