diff options
author | adheli.tavares <adheli.tavares@est.tech> | 2023-09-28 14:25:43 +0100 |
---|---|---|
committer | adheli.tavares <adheli.tavares@est.tech> | 2023-09-29 10:30:58 +0100 |
commit | cf36274c5ae0bc569ec7ebe2cb4e8f579763cc14 (patch) | |
tree | c9a9403714185944ca9ad0f93cd1478072b748b2 /policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java | |
parent | 349b4ae7179173f9261d9a432094cb55dc433820 (diff) |
Fix security vulnerabilities
- iq nexus vulnerabilities
- sonar security hotspots and code smell
Issue-ID: POLICY-4761
Issue-ID: POLICY-4833
Change-Id: Iab2e07d2ee7b90031bc5a30210ce7d3f5a47b3fd
Signed-off-by: adheli.tavares <adheli.tavares@est.tech>
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java | 238 |
1 files changed, 95 insertions, 143 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 8542d572..79e374a2 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -5,7 +5,7 @@ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved. - * Copyright (C) 2022 Nordix Foundation. + * Modifications Copyright (C) 2022-2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,7 +31,6 @@ import java.net.MalformedURLException; import java.security.GeneralSecurityException; import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -46,6 +45,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.jetbrains.annotations.NotNull; import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.impl.MRConsumerImpl; import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder; @@ -76,8 +76,8 @@ public interface BusConsumer { /** * Consumer that handles fetch() failures by sleeping. */ - public abstract static class FetchingBusConsumer implements BusConsumer { - private static Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class); + abstract class FetchingBusConsumer implements BusConsumer { + private static final Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class); /** * Fetch timeout. @@ -158,18 +158,16 @@ public interface BusConsumer { /** * Cambria Consumer Wrapper. * BusTopicParam object contains the following parameters - * servers messaging bus hosts. - * topic topic - * apiKey API Key - * apiSecret API Secret - * consumerGroup Consumer Group - * consumerInstance Consumer Instance - * fetchTimeout Fetch Timeout - * fetchLimit Fetch Limit + * servers - messaging bus hosts. + * topic - topic for messages + * apiKey - API Key + * apiSecret - API Secret + * consumerGroup - Consumer Group + * consumerInstance - Consumer Instance + * fetchTimeout - Fetch Timeout + * fetchLimit - Fetch Limit * * @param busTopicParams - The parameters for the bus topic - * @throws GeneralSecurityException - Security exception - * @throws MalformedURLException - Malformed URL exception */ public CambriaConsumerWrapper(BusTopicParams busTopicParams) { super(busTopicParams); @@ -177,8 +175,8 @@ public interface BusConsumer { this.builder = new CambriaClientBuilders.ConsumerBuilder(); builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance()) - .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic()) - .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit()); + .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic()) + .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit()); // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable) builder.withSocketTimeout(fetchTimeout + 30000); @@ -232,12 +230,12 @@ public interface BusConsumer { /** * Kafka based consumer. */ - public static class KafkaConsumerWrapper extends FetchingBusConsumer { + class KafkaConsumerWrapper extends FetchingBusConsumer { /** * logger. */ - private static Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class); + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class); private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; @@ -249,15 +247,13 @@ public interface BusConsumer { /** * Kafka Consumer Wrapper. - * BusTopicParam object contains the following parameters - * servers messaging bus hosts. - * topic topic + * BusTopicParam - object contains the following parameters + * servers - messaging bus hosts. + * topic - topic * * @param busTopicParams - The parameters for the bus topic - * @throws GeneralSecurityException - Security exception - * @throws MalformedURLException - Malformed URL exception */ - public KafkaConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { + public KafkaConsumerWrapper(BusTopicParams busTopicParams) { super(busTopicParams); if (busTopicParams.isTopicInvalid()) { @@ -267,12 +263,10 @@ public interface BusConsumer { //Setup Properties for consumer kafkaProps = new Properties(); kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - busTopicParams.getServers().get(0)); + busTopicParams.getServers().get(0)); if (busTopicParams.isAdditionalPropsValid()) { - for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) { - kafkaProps.put(entry.getKey(), entry.getValue()); - } + kafkaProps.putAll(busTopicParams.getAdditionalProps()); } if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) { @@ -286,11 +280,11 @@ public interface BusConsumer { } consumer = new KafkaConsumer<>(kafkaProps); //Subscribe to the topic - consumer.subscribe(Arrays.asList(busTopicParams.getTopic())); + consumer.subscribe(List.of(busTopicParams.getTopic())); } @Override - public Iterable<String> fetch() throws IOException { + public Iterable<String> fetch() { ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout)); if (records == null || records.count() <= 0) { return Collections.emptyList(); @@ -306,7 +300,7 @@ public interface BusConsumer { consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } catch (Exception e) { - logger.error("{}: cannot fetch because of {}", this, e.getMessage()); + logger.error("{}: cannot fetch, throwing exception after sleep...", this); sleepAfterFetchFailure(); throw e; } @@ -334,7 +328,7 @@ public interface BusConsumer { /** * logger. */ - private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class); + private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class); /** * Name of the "protocol" property. @@ -349,16 +343,16 @@ public interface BusConsumer { /** * MR Consumer Wrapper. * - * <p>servers messaging bus hosts - * topic topic - * apiKey API Key - * apiSecret API Secret - * username AAF Login - * password AAF Password - * consumerGroup Consumer Group - * consumerInstance Consumer Instance - * fetchTimeout Fetch Timeout - * fetchLimit Fetch Limit + * <p>servers - messaging bus hosts + * topic - topic + * apiKey - API Key + * apiSecret - API Secret + * username - AAF Login + * password - AAF Password + * consumerGroup - Consumer Group + * consumerInstance - Consumer Instance + * fetchTimeout - Fetch Timeout + * fetchLimit - Fetch Limit * * @param busTopicParams contains above listed attributes * @throws MalformedURLException URL should be valid @@ -371,22 +365,22 @@ public interface BusConsumer { } this.consumer = new MRConsumerImplBuilder() - .setHostPart(busTopicParams.getServers()) - .setTopic(busTopicParams.getTopic()) - .setConsumerGroup(busTopicParams.getConsumerGroup()) - .setConsumerId(busTopicParams.getConsumerInstance()) - .setTimeoutMs(busTopicParams.getFetchTimeout()) - .setLimit(busTopicParams.getFetchLimit()) - .setApiKey(busTopicParams.getApiKey()) - .setApiSecret(busTopicParams.getApiSecret()) - .createMRConsumerImpl(); + .setHostPart(busTopicParams.getServers()) + .setTopic(busTopicParams.getTopic()) + .setConsumerGroup(busTopicParams.getConsumerGroup()) + .setConsumerId(busTopicParams.getConsumerInstance()) + .setTimeoutMs(busTopicParams.getFetchTimeout()) + .setLimit(busTopicParams.getFetchLimit()) + .setApiKey(busTopicParams.getApiKey()) + .setApiSecret(busTopicParams.getApiSecret()) + .createMRConsumerImpl(); this.consumer.setUsername(busTopicParams.getUserName()); this.consumer.setPassword(busTopicParams.getPassword()); } @Override - public Iterable<String> fetch() throws IOException { + public Iterable<String> fetch() { final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse(); if (response == null) { logger.warn("{}: DMaaP NULL response received", this); @@ -395,12 +389,12 @@ public interface BusConsumer { return new ArrayList<>(); } else { logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), - response.getResponseMessage()); + response.getResponseMessage()); if (!"200".equals(response.getResponseCode())) { logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(), - response.getResponseMessage()); + response.getResponseMessage()); sleepAfterFetchFailure(); @@ -424,35 +418,33 @@ public interface BusConsumer { @Override public String toString() { return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate() - + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost() - + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()=" - + consumer.getUsername() + "]"; + + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost() + + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()=" + + consumer.getUsername() + "]"; } } /** * MR based consumer. */ - public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper { + class DmaapAafConsumerWrapper extends DmaapConsumerWrapper { - private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class); - - private final Properties props; + private static final Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class); /** * BusTopicParams contain the following parameters. * MR Consumer Wrapper. * * <p>servers messaging bus hosts - * topic topic - * apiKey API Key - * apiSecret API Secret - * aafLogin AAF Login - * aafPassword AAF Password - * consumerGroup Consumer Group - * consumerInstance Consumer Instance - * fetchTimeout Fetch Timeout - * fetchLimit Fetch Limit + * topic - topic + * apiKey - API Key + * apiSecret - API Secret + * aafLogin - AAF Login + * aafPassword - AAF Password + * consumerGroup - Consumer Group + * consumerInstance - Consumer Instance + * fetchTimeout - Fetch Timeout + * fetchLimit - Fetch Limit * * @param busTopicParams contains above listed params * @throws MalformedURLException URL should be valid @@ -468,7 +460,7 @@ public interface BusConsumer { this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - props = new Properties(); + Properties props = new Properties(); if (busTopicParams.isUseHttps()) { props.setProperty(PROTOCOL_PROP, "https"); @@ -488,23 +480,20 @@ public interface BusConsumer { final MRConsumerImpl consumer = this.consumer; return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate() - + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost() - + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()=" - + consumer.getUsername() + "]"; + + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost() + + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()=" + + consumer.getUsername() + "]"; } } - public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper { - - private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class); + class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper { - private final Properties props; + private static final Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class); /** * Constructor. * - * @param busTopicParams topic paramters - * + * @param busTopicParams topic parameters * @throws MalformedURLException must provide a valid URL */ public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { @@ -514,36 +503,21 @@ public interface BusConsumer { final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid() - ? busTopicParams.getAdditionalProps().get( - PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY) - : null); + ? busTopicParams.getAdditionalProps().get( + PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY) + : null); - if (busTopicParams.isEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isAftEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isLatitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - } - if (busTopicParams.isLongitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - } + BusHelper.validateBusTopicParams(busTopicParams, PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS); if ((busTopicParams.isPartnerInvalid()) - && StringUtils.isBlank(dme2RouteOffer)) { + && StringUtils.isBlank(dme2RouteOffer)) { throw new IllegalArgumentException( - "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " - + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); + "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + busTopicParams.getTopic() + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + busTopicParams.getTopic() + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); } final String serviceName = busTopicParams.getServers().get(0); @@ -553,7 +527,18 @@ public interface BusConsumer { this.consumer.setUsername(busTopicParams.getUserName()); this.consumer.setPassword(busTopicParams.getPassword()); - props = new Properties(); + Properties props = getProperties(busTopicParams, serviceName, dme2RouteOffer); + + MRClientFactory.prop = props; + this.consumer.setProps(props); + + logger.info("{}: CREATION", this); + } + + @NotNull + private static Properties getProperties(BusTopicParams busTopicParams, String serviceName, + String dme2RouteOffer) { + Properties props = new Properties(); props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName); @@ -563,29 +548,8 @@ public interface BusConsumer { /* These are required, no defaults */ props.setProperty("topic", busTopicParams.getTopic()); - props.setProperty("Environment", busTopicParams.getEnvironment()); - props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment()); + BusHelper.setCommonProperties(busTopicParams, dme2RouteOffer, props); - if (busTopicParams.getPartner() != null) { - props.setProperty("Partner", busTopicParams.getPartner()); - } - if (dme2RouteOffer != null) { - props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - } - - props.setProperty("Latitude", busTopicParams.getLatitude()); - props.setProperty("Longitude", busTopicParams.getLongitude()); - - /* These are optional, will default to these values if not set in additionalProps */ - props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); - props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); - props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); - props.setProperty("Version", "1.0"); - props.setProperty("SubContextPath", "/"); - props.setProperty("sessionstickinessrequired", "no"); - - /* These should not change */ - props.setProperty("TransportType", "DME2"); props.setProperty("MethodType", "GET"); if (busTopicParams.isUseHttps()) { @@ -598,21 +562,9 @@ public interface BusConsumer { props.setProperty("contenttype", "application/json"); if (busTopicParams.isAdditionalPropsValid()) { - for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) { - props.put(entry.getKey(), entry.getValue()); - } + props.putAll(busTopicParams.getAdditionalProps()); } - - MRClientFactory.prop = props; - this.consumer.setProps(props); - - logger.info("{}: CREATION", this); - } - - private IllegalArgumentException parmException(String topic, String propnm) { - return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + propnm + " property for DME2 in DMaaP"); - + return props; } } } |