diff options
Diffstat (limited to 'policy-endpoints/src')
7 files changed, 270 insertions, 297 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java index d37410e9..5ba32b28 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java @@ -3,7 +3,7 @@ * ONAP * ================================================================================ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2022 Nordix Foundation. + * Modifications Copyright (C) 2022-2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ package org.onap.policy.common.endpoints.event.comm; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Properties; import lombok.Getter; import org.onap.policy.common.capabilities.Startable; @@ -49,6 +50,7 @@ import org.slf4j.LoggerFactory; * This implementation of the Topic Endpoint Manager, proxies operations to the appropriate * implementation(s). */ +@Getter class TopicEndpointProxy implements TopicEndpoint { /** * Logger. @@ -58,13 +60,11 @@ class TopicEndpointProxy implements TopicEndpoint { /** * Is this element locked boolean. */ - @Getter private volatile boolean locked = false; /** * Is this element alive boolean. */ - @Getter private volatile boolean alive = false; @Override @@ -77,9 +77,9 @@ class TopicEndpointProxy implements TopicEndpoint { @Override public List<Topic> addTopics(TopicParameterGroup params) { List<TopicParameters> sinks = - (params.getTopicSinks() != null ? params.getTopicSinks() : Collections.emptyList()); + (params.getTopicSinks() != null ? params.getTopicSinks() : Collections.emptyList()); List<TopicParameters> sources = - (params.getTopicSources() != null ? params.getTopicSources() : Collections.emptyList()); + (params.getTopicSources() != null ? params.getTopicSources() : Collections.emptyList()); List<Topic> topics = new ArrayList<>(sinks.size() + sources.size()); topics.addAll(addTopicSources(sources)); @@ -107,7 +107,7 @@ class TopicEndpointProxy implements TopicEndpoint { break; default: logger.debug("Unknown source type {} for topic: {}", param.getTopicCommInfrastructure(), - param.getTopic()); + param.getTopic()); break; } } @@ -163,7 +163,7 @@ class TopicEndpointProxy implements TopicEndpoint { break; default: logger.debug("Unknown sink type {} for topic: {}", param.getTopicCommInfrastructure(), - param.getTopic()); + param.getTopic()); break; } } @@ -219,43 +219,33 @@ class TopicEndpointProxy implements TopicEndpoint { } final List<TopicSource> sources = new ArrayList<>(); - for (final String topic : topicNames) { + + topicNames.forEach(topic -> { try { - final TopicSource uebSource = this.getUebTopicSource(topic); - if (uebSource != null) { - sources.add(uebSource); - } + sources.add(Objects.requireNonNull(this.getUebTopicSource(topic))); } catch (final Exception e) { logger.debug("No UEB source for topic: {}", topic, e); } try { - final TopicSource dmaapSource = this.getDmaapTopicSource(topic); - if (dmaapSource != null) { - sources.add(dmaapSource); - } + sources.add(Objects.requireNonNull(this.getDmaapTopicSource(topic))); } catch (final Exception e) { logger.debug("No DMAAP source for topic: {}", topic, e); } try { - final TopicSource kafkaSource = this.getKafkaTopicSource(topic); - if (kafkaSource != null) { - sources.add(kafkaSource); - } + sources.add(Objects.requireNonNull(this.getKafkaTopicSource(topic))); } catch (final Exception e) { logger.debug("No KAFKA source for topic: {}", topic, e); } try { - final TopicSource noopSource = this.getNoopTopicSource(topic); - if (noopSource != null) { - sources.add(noopSource); - } + sources.add(Objects.requireNonNull(this.getNoopTopicSource(topic))); } catch (final Exception e) { logger.debug("No NOOP source for topic: {}", topic, e); } - } + }); + return sources; } @@ -282,37 +272,25 @@ class TopicEndpointProxy implements TopicEndpoint { final List<TopicSink> sinks = new ArrayList<>(); for (final String topic : topicNames) { try { - final TopicSink uebSink = this.getUebTopicSink(topic); - if (uebSink != null) { - sinks.add(uebSink); - } + sinks.add(Objects.requireNonNull(this.getUebTopicSink(topic))); } catch (final Exception e) { logger.debug("No UEB sink for topic: {}", topic, e); } try { - final TopicSink dmaapSink = this.getDmaapTopicSink(topic); - if (dmaapSink != null) { - sinks.add(dmaapSink); - } + sinks.add(Objects.requireNonNull(this.getDmaapTopicSink(topic))); } catch (final Exception e) { logger.debug("No DMAAP sink for topic: {}", topic, e); } try { - final TopicSink kafkaSink = this.getKafkaTopicSink(topic); - if (kafkaSink != null) { - sinks.add(kafkaSink); - } + sinks.add(Objects.requireNonNull(this.getKafkaTopicSink(topic))); } catch (final Exception e) { logger.debug("No KAFKA sink for topic: {}", topic, e); } try { - final TopicSink noopSink = this.getNoopTopicSink(topic); - if (noopSink != null) { - sinks.add(noopSink); - } + sinks.add(Objects.requireNonNull(this.getNoopTopicSink(topic))); } catch (final Exception e) { logger.debug("No NOOP sink for topic: {}", topic, e); } @@ -323,7 +301,7 @@ class TopicEndpointProxy implements TopicEndpoint { @Override public List<TopicSink> getTopicSinks(String topicName) { if (topicName == null) { - throw parmException(null); + throw paramException(null); } final List<TopicSink> sinks = new ArrayList<>(); @@ -540,49 +518,39 @@ class TopicEndpointProxy implements TopicEndpoint { public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) { if (commType == null) { - throw parmException(topicName); + throw paramException(topicName); } if (topicName == null) { - throw parmException(null); + throw paramException(null); } - switch (commType) { - case UEB: - return this.getUebTopicSource(topicName); - case DMAAP: - return this.getDmaapTopicSource(topicName); - case KAFKA: - return this.getKafkaTopicSource(topicName); - case NOOP: - return this.getNoopTopicSource(topicName); - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } + return switch (commType) { + case UEB -> this.getUebTopicSource(topicName); + case DMAAP -> this.getDmaapTopicSource(topicName); + case KAFKA -> this.getKafkaTopicSource(topicName); + case NOOP -> this.getNoopTopicSource(topicName); + default -> throw new UnsupportedOperationException("Unsupported " + commType.name()); + }; } @Override public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) { if (commType == null) { - throw parmException(topicName); + throw paramException(topicName); } if (topicName == null) { - throw parmException(null); + throw paramException(null); } - switch (commType) { - case UEB: - return this.getUebTopicSink(topicName); - case DMAAP: - return this.getDmaapTopicSink(topicName); - case KAFKA: - return this.getKafkaTopicSink(topicName); - case NOOP: - return this.getNoopTopicSink(topicName); - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } + return switch (commType) { + case UEB -> this.getUebTopicSink(topicName); + case DMAAP -> this.getDmaapTopicSink(topicName); + case KAFKA -> this.getKafkaTopicSink(topicName); + case NOOP -> this.getNoopTopicSink(topicName); + default -> throw new UnsupportedOperationException("Unsupported " + commType.name()); + }; } @Override @@ -625,7 +593,7 @@ class TopicEndpointProxy implements TopicEndpoint { return NoopTopicFactories.getSinkFactory().get(topicName); } - private IllegalArgumentException parmException(String topicName) { + private IllegalArgumentException paramException(String topicName) { return new IllegalArgumentException( "Invalid parameter: a communication infrastructure required to fetch " + topicName); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 8542d572..79e374a2 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -5,7 +5,7 @@ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved. - * Copyright (C) 2022 Nordix Foundation. + * Modifications Copyright (C) 2022-2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,7 +31,6 @@ import java.net.MalformedURLException; import java.security.GeneralSecurityException; import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -46,6 +45,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.jetbrains.annotations.NotNull; import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.impl.MRConsumerImpl; import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder; @@ -76,8 +76,8 @@ public interface BusConsumer { /** * Consumer that handles fetch() failures by sleeping. */ - public abstract static class FetchingBusConsumer implements BusConsumer { - private static Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class); + abstract class FetchingBusConsumer implements BusConsumer { + private static final Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class); /** * Fetch timeout. @@ -158,18 +158,16 @@ public interface BusConsumer { /** * Cambria Consumer Wrapper. * BusTopicParam object contains the following parameters - * servers messaging bus hosts. - * topic topic - * apiKey API Key - * apiSecret API Secret - * consumerGroup Consumer Group - * consumerInstance Consumer Instance - * fetchTimeout Fetch Timeout - * fetchLimit Fetch Limit + * servers - messaging bus hosts. + * topic - topic for messages + * apiKey - API Key + * apiSecret - API Secret + * consumerGroup - Consumer Group + * consumerInstance - Consumer Instance + * fetchTimeout - Fetch Timeout + * fetchLimit - Fetch Limit * * @param busTopicParams - The parameters for the bus topic - * @throws GeneralSecurityException - Security exception - * @throws MalformedURLException - Malformed URL exception */ public CambriaConsumerWrapper(BusTopicParams busTopicParams) { super(busTopicParams); @@ -177,8 +175,8 @@ public interface BusConsumer { this.builder = new CambriaClientBuilders.ConsumerBuilder(); builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance()) - .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic()) - .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit()); + .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic()) + .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit()); // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable) builder.withSocketTimeout(fetchTimeout + 30000); @@ -232,12 +230,12 @@ public interface BusConsumer { /** * Kafka based consumer. */ - public static class KafkaConsumerWrapper extends FetchingBusConsumer { + class KafkaConsumerWrapper extends FetchingBusConsumer { /** * logger. */ - private static Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class); + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class); private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; @@ -249,15 +247,13 @@ public interface BusConsumer { /** * Kafka Consumer Wrapper. - * BusTopicParam object contains the following parameters - * servers messaging bus hosts. - * topic topic + * BusTopicParam - object contains the following parameters + * servers - messaging bus hosts. + * topic - topic * * @param busTopicParams - The parameters for the bus topic - * @throws GeneralSecurityException - Security exception - * @throws MalformedURLException - Malformed URL exception */ - public KafkaConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { + public KafkaConsumerWrapper(BusTopicParams busTopicParams) { super(busTopicParams); if (busTopicParams.isTopicInvalid()) { @@ -267,12 +263,10 @@ public interface BusConsumer { //Setup Properties for consumer kafkaProps = new Properties(); kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - busTopicParams.getServers().get(0)); + busTopicParams.getServers().get(0)); if (busTopicParams.isAdditionalPropsValid()) { - for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) { - kafkaProps.put(entry.getKey(), entry.getValue()); - } + kafkaProps.putAll(busTopicParams.getAdditionalProps()); } if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) { @@ -286,11 +280,11 @@ public interface BusConsumer { } consumer = new KafkaConsumer<>(kafkaProps); //Subscribe to the topic - consumer.subscribe(Arrays.asList(busTopicParams.getTopic())); + consumer.subscribe(List.of(busTopicParams.getTopic())); } @Override - public Iterable<String> fetch() throws IOException { + public Iterable<String> fetch() { ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout)); if (records == null || records.count() <= 0) { return Collections.emptyList(); @@ -306,7 +300,7 @@ public interface BusConsumer { consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } catch (Exception e) { - logger.error("{}: cannot fetch because of {}", this, e.getMessage()); + logger.error("{}: cannot fetch, throwing exception after sleep...", this); sleepAfterFetchFailure(); throw e; } @@ -334,7 +328,7 @@ public interface BusConsumer { /** * logger. */ - private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class); + private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class); /** * Name of the "protocol" property. @@ -349,16 +343,16 @@ public interface BusConsumer { /** * MR Consumer Wrapper. * - * <p>servers messaging bus hosts - * topic topic - * apiKey API Key - * apiSecret API Secret - * username AAF Login - * password AAF Password - * consumerGroup Consumer Group - * consumerInstance Consumer Instance - * fetchTimeout Fetch Timeout - * fetchLimit Fetch Limit + * <p>servers - messaging bus hosts + * topic - topic + * apiKey - API Key + * apiSecret - API Secret + * username - AAF Login + * password - AAF Password + * consumerGroup - Consumer Group + * consumerInstance - Consumer Instance + * fetchTimeout - Fetch Timeout + * fetchLimit - Fetch Limit * * @param busTopicParams contains above listed attributes * @throws MalformedURLException URL should be valid @@ -371,22 +365,22 @@ public interface BusConsumer { } this.consumer = new MRConsumerImplBuilder() - .setHostPart(busTopicParams.getServers()) - .setTopic(busTopicParams.getTopic()) - .setConsumerGroup(busTopicParams.getConsumerGroup()) - .setConsumerId(busTopicParams.getConsumerInstance()) - .setTimeoutMs(busTopicParams.getFetchTimeout()) - .setLimit(busTopicParams.getFetchLimit()) - .setApiKey(busTopicParams.getApiKey()) - .setApiSecret(busTopicParams.getApiSecret()) - .createMRConsumerImpl(); + .setHostPart(busTopicParams.getServers()) + .setTopic(busTopicParams.getTopic()) + .setConsumerGroup(busTopicParams.getConsumerGroup()) + .setConsumerId(busTopicParams.getConsumerInstance()) + .setTimeoutMs(busTopicParams.getFetchTimeout()) + .setLimit(busTopicParams.getFetchLimit()) + .setApiKey(busTopicParams.getApiKey()) + .setApiSecret(busTopicParams.getApiSecret()) + .createMRConsumerImpl(); this.consumer.setUsername(busTopicParams.getUserName()); this.consumer.setPassword(busTopicParams.getPassword()); } @Override - public Iterable<String> fetch() throws IOException { + public Iterable<String> fetch() { final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse(); if (response == null) { logger.warn("{}: DMaaP NULL response received", this); @@ -395,12 +389,12 @@ public interface BusConsumer { return new ArrayList<>(); } else { logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), - response.getResponseMessage()); + response.getResponseMessage()); if (!"200".equals(response.getResponseCode())) { logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(), - response.getResponseMessage()); + response.getResponseMessage()); sleepAfterFetchFailure(); @@ -424,35 +418,33 @@ public interface BusConsumer { @Override public String toString() { return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate() - + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost() - + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()=" - + consumer.getUsername() + "]"; + + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost() + + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()=" + + consumer.getUsername() + "]"; } } /** * MR based consumer. */ - public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper { + class DmaapAafConsumerWrapper extends DmaapConsumerWrapper { - private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class); - - private final Properties props; + private static final Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class); /** * BusTopicParams contain the following parameters. * MR Consumer Wrapper. * * <p>servers messaging bus hosts - * topic topic - * apiKey API Key - * apiSecret API Secret - * aafLogin AAF Login - * aafPassword AAF Password - * consumerGroup Consumer Group - * consumerInstance Consumer Instance - * fetchTimeout Fetch Timeout - * fetchLimit Fetch Limit + * topic - topic + * apiKey - API Key + * apiSecret - API Secret + * aafLogin - AAF Login + * aafPassword - AAF Password + * consumerGroup - Consumer Group + * consumerInstance - Consumer Instance + * fetchTimeout - Fetch Timeout + * fetchLimit - Fetch Limit * * @param busTopicParams contains above listed params * @throws MalformedURLException URL should be valid @@ -468,7 +460,7 @@ public interface BusConsumer { this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - props = new Properties(); + Properties props = new Properties(); if (busTopicParams.isUseHttps()) { props.setProperty(PROTOCOL_PROP, "https"); @@ -488,23 +480,20 @@ public interface BusConsumer { final MRConsumerImpl consumer = this.consumer; return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate() - + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost() - + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()=" - + consumer.getUsername() + "]"; + + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost() + + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()=" + + consumer.getUsername() + "]"; } } - public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper { - - private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class); + class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper { - private final Properties props; + private static final Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class); /** * Constructor. * - * @param busTopicParams topic paramters - * + * @param busTopicParams topic parameters * @throws MalformedURLException must provide a valid URL */ public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { @@ -514,36 +503,21 @@ public interface BusConsumer { final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid() - ? busTopicParams.getAdditionalProps().get( - PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY) - : null); + ? busTopicParams.getAdditionalProps().get( + PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY) + : null); - if (busTopicParams.isEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isAftEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isLatitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - } - if (busTopicParams.isLongitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - } + BusHelper.validateBusTopicParams(busTopicParams, PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS); if ((busTopicParams.isPartnerInvalid()) - && StringUtils.isBlank(dme2RouteOffer)) { + && StringUtils.isBlank(dme2RouteOffer)) { throw new IllegalArgumentException( - "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " - + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); + "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + busTopicParams.getTopic() + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + busTopicParams.getTopic() + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); } final String serviceName = busTopicParams.getServers().get(0); @@ -553,7 +527,18 @@ public interface BusConsumer { this.consumer.setUsername(busTopicParams.getUserName()); this.consumer.setPassword(busTopicParams.getPassword()); - props = new Properties(); + Properties props = getProperties(busTopicParams, serviceName, dme2RouteOffer); + + MRClientFactory.prop = props; + this.consumer.setProps(props); + + logger.info("{}: CREATION", this); + } + + @NotNull + private static Properties getProperties(BusTopicParams busTopicParams, String serviceName, + String dme2RouteOffer) { + Properties props = new Properties(); props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName); @@ -563,29 +548,8 @@ public interface BusConsumer { /* These are required, no defaults */ props.setProperty("topic", busTopicParams.getTopic()); - props.setProperty("Environment", busTopicParams.getEnvironment()); - props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment()); + BusHelper.setCommonProperties(busTopicParams, dme2RouteOffer, props); - if (busTopicParams.getPartner() != null) { - props.setProperty("Partner", busTopicParams.getPartner()); - } - if (dme2RouteOffer != null) { - props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - } - - props.setProperty("Latitude", busTopicParams.getLatitude()); - props.setProperty("Longitude", busTopicParams.getLongitude()); - - /* These are optional, will default to these values if not set in additionalProps */ - props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); - props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); - props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); - props.setProperty("Version", "1.0"); - props.setProperty("SubContextPath", "/"); - props.setProperty("sessionstickinessrequired", "no"); - - /* These should not change */ - props.setProperty("TransportType", "DME2"); props.setProperty("MethodType", "GET"); if (busTopicParams.isUseHttps()) { @@ -598,21 +562,9 @@ public interface BusConsumer { props.setProperty("contenttype", "application/json"); if (busTopicParams.isAdditionalPropsValid()) { - for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) { - props.put(entry.getKey(), entry.getValue()); - } + props.putAll(busTopicParams.getAdditionalProps()); } - - MRClientFactory.prop = props; - this.consumer.setProps(props); - - logger.info("{}: CREATION", this); - } - - private IllegalArgumentException parmException(String topic, String propnm) { - return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + propnm + " property for DME2 in DMaaP"); - + return props; } } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusHelper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusHelper.java new file mode 100644 index 00000000..298607b5 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusHelper.java @@ -0,0 +1,95 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP POLICY + * ================================================================================ + * Copyright (C) 2023 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END============================================ + * =================================================================== + * + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +import java.util.Properties; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; + +public class BusHelper { + + private BusHelper() { + /* no constructor */ + } + + /** + * Complete the properties param with common fields for both BusConsumer and BusPublisher. + * @param busTopicParams topics + * @param dme2RouteOffer route + * @param props properties + */ + public static void setCommonProperties(BusTopicParams busTopicParams, String dme2RouteOffer, Properties props) { + props.setProperty("Environment", busTopicParams.getEnvironment()); + props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment()); + + if (busTopicParams.getPartner() != null) { + props.setProperty("Partner", busTopicParams.getPartner()); + } + if (dme2RouteOffer != null) { + props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + } + + props.setProperty("Latitude", busTopicParams.getLatitude()); + props.setProperty("Longitude", busTopicParams.getLongitude()); + + /* These are optional, will default to these values if not set in additionalProps */ + props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); + props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); + props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); + props.setProperty("Version", "1.0"); + props.setProperty("SubContextPath", "/"); + props.setProperty("sessionstickinessrequired", "no"); + + /* These should not change */ + props.setProperty("TransportType", "DME2"); + } + + /** + * Throws exception when any of the checks are invalid. + * @param busTopicParams topics + * @param topicType topic type (sink or source) + */ + public static void validateBusTopicParams(BusTopicParams busTopicParams, String topicType) { + if (busTopicParams.isEnvironmentInvalid()) { + throw paramException(busTopicParams.getTopic(), topicType, + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + } + if (busTopicParams.isAftEnvironmentInvalid()) { + throw paramException(busTopicParams.getTopic(), topicType, + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + } + if (busTopicParams.isLatitudeInvalid()) { + throw paramException(busTopicParams.getTopic(), topicType, + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + } + if (busTopicParams.isLongitudeInvalid()) { + throw paramException(busTopicParams.getTopic(), topicType, + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + } + } + + private static IllegalArgumentException paramException(String topic, String topicType, String propertyName) { + return new IllegalArgumentException("Missing " + topicType + "." + + topic + propertyName + " property for DME2 in DMaaP"); + + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index 92f7bc6f..ef8e1742 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -49,7 +49,9 @@ import org.slf4j.LoggerFactory; public interface BusPublisher { - public static final String NO_MESSAGE_PROVIDED = "No message provided"; + String NO_MESSAGE_PROVIDED = "No message provided"; + String LOG_CLOSE = "{}: CLOSE"; + String LOG_CLOSE_FAILED = "{}: CLOSE FAILED"; /** * sends a message. @@ -59,19 +61,19 @@ public interface BusPublisher { * @return true if success, false otherwise * @throws IllegalArgumentException if no message provided */ - public boolean send(String partitionId, String message); + boolean send(String partitionId, String message); /** * closes the publisher. */ - public void close(); + void close(); /** * Cambria based library publisher. */ - public static class CambriaPublisherWrapper implements BusPublisher { + class CambriaPublisherWrapper implements BusPublisher { - private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class); + private static final Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class); /** * The actual Cambria publisher. @@ -133,7 +135,7 @@ public interface BusPublisher { @Override public void close() { - logger.info("{}: CLOSE", this); + logger.info(LOG_CLOSE, this); try { this.publisher.close(); @@ -152,17 +154,17 @@ public interface BusPublisher { /** * Kafka based library publisher. */ - public static class KafkaPublisherWrapper implements BusPublisher { + class KafkaPublisherWrapper implements BusPublisher { - private static Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class); + private static final Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class); private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; - private String topic; + private final String topic; /** * Kafka publisher. */ - private Producer<String, String> producer; + private final Producer<String, String> producer; protected Properties kafkaProps; /** @@ -182,9 +184,7 @@ public interface BusPublisher { kafkaProps = new Properties(); kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0)); if (busTopicParams.isAdditionalPropsValid()) { - for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) { - kafkaProps.put(entry.getKey(), entry.getValue()); - } + kafkaProps.putAll(busTopicParams.getAdditionalProps()); } if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) { kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER); @@ -218,7 +218,7 @@ public interface BusPublisher { @Override public void close() { - logger.info("{}: CLOSE", this); + logger.info(LOG_CLOSE, this); try { this.producer.close(); @@ -237,9 +237,9 @@ public interface BusPublisher { /** * DmaapClient library wrapper. */ - public abstract class DmaapPublisherWrapper implements BusPublisher { + abstract class DmaapPublisherWrapper implements BusPublisher { - private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class); + private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class); /** * MR based Publisher. @@ -320,17 +320,17 @@ public interface BusPublisher { @Override public void close() { - logger.info("{}: CLOSE", this); + logger.info(LOG_CLOSE, this); try { this.publisher.close(1, TimeUnit.SECONDS); } catch (InterruptedException e) { - logger.warn("{}: CLOSE FAILED", this, e); + logger.warn(LOG_CLOSE_FAILED, this, e); Thread.currentThread().interrupt(); } catch (Exception e) { - logger.warn("{}: CLOSE FAILED", this, e); + logger.warn(LOG_CLOSE_FAILED, this, e); } } @@ -363,7 +363,7 @@ public interface BusPublisher { /** * DmaapClient library wrapper. */ - public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper { + class DmaapAafPublisherWrapper extends DmaapPublisherWrapper { /** * MR based Publisher. */ @@ -374,7 +374,7 @@ public interface BusPublisher { } } - public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper { + class DmaapDmePublisherWrapper extends DmaapPublisherWrapper { /** * Constructor. @@ -395,33 +395,10 @@ public interface BusPublisher { String serviceName = busTopicParams.getServers().get(0); /* These are required, no defaults */ - props.setProperty("Environment", busTopicParams.getEnvironment()); - props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment()); - props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName); - if (busTopicParams.getPartner() != null) { - props.setProperty("Partner", busTopicParams.getPartner()); - } - if (dme2RouteOffer != null) { - props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - } - - props.setProperty("Latitude", busTopicParams.getLatitude()); - props.setProperty("Longitude", busTopicParams.getLongitude()); - - // ServiceName also a default, found in additionalProps - - /* These are optional, will default to these values if not set in optionalProps */ - props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); - props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); - props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); - props.setProperty("Version", "1.0"); - props.setProperty("SubContextPath", "/"); - props.setProperty("sessionstickinessrequired", "no"); + BusHelper.setCommonProperties(busTopicParams, dme2RouteOffer, props); - /* These should not change */ - props.setProperty("TransportType", "DME2"); props.setProperty("MethodType", "POST"); if (busTopicParams.isAdditionalPropsValid()) { @@ -432,22 +409,7 @@ public interface BusPublisher { } private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) { - if (busTopicParams.isEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isAftEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isLatitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - } - if (busTopicParams.isLongitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - } + BusHelper.validateBusTopicParams(busTopicParams, PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS); if ((busTopicParams.isPartnerInvalid()) && StringUtils.isBlank(dme2RouteOffer)) { throw new IllegalArgumentException("Must provide at least " @@ -468,11 +430,5 @@ public interface BusPublisher { } } } - - private IllegalArgumentException parmException(String topic, String propnm) { - return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + propnm + " property for DME2 in DMaaP"); - - } } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java index daeaea13..3372e0aa 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java @@ -4,6 +4,7 @@ * ================================================================================ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,7 +38,7 @@ public abstract class TopicBase implements Topic { /** * Logger. */ - private static Logger logger = LoggerFactory.getLogger(TopicBase.class); + private static final Logger logger = LoggerFactory.getLogger(TopicBase.class); /** * List of servers. diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServerFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServerFactory.java index 4aecd1e0..7ce0becd 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServerFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServerFactory.java @@ -30,7 +30,7 @@ import java.util.Properties; public interface HttpServletServerFactory { /** - * Builds an http or https rest server with support for servlets. + * Builds a http or https rest server with support for servlets. * * @param name name * @param https use secured http over tls connection @@ -47,7 +47,7 @@ public interface HttpServletServerFactory { boolean swagger, boolean managed); /** - * Builds an http rest server with support for servlets. + * Builds a http rest server with support for servlets. * * @param name name * @param host binding host @@ -70,7 +70,7 @@ public interface HttpServletServerFactory { List<HttpServletServer> build(Properties properties); /** - * Builds an http or https server to manage static resources. + * Builds a http or https server to manage static resources. * * @param name name * @param https use secured http over tls connection diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java index 94e7c0c7..86b32e69 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java @@ -26,6 +26,7 @@ import static org.assertj.core.api.Assertions.assertThatCode; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -66,7 +67,7 @@ public class BusConsumerTest extends TopicTestBase { } @Test - public void testFetchingBusConsumer() throws InterruptedException { + public void testFetchingBusConsumer() { // should not be negative var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build()); assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH); @@ -174,7 +175,7 @@ public class BusConsumerTest extends TopicTestBase { @Test public void testCambriaConsumerWrapperClose() { CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build()); - assertThatCode(() -> cons.close()).doesNotThrowAnyException(); + assertThatCode(cons::close).doesNotThrowAnyException(); } @Test @@ -183,7 +184,7 @@ public class BusConsumerTest extends TopicTestBase { } @Test - public void testDmaapConsumerWrapper() throws Exception { + public void testDmaapConsumerWrapper() { // verify that different wrappers can be built assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException(); } @@ -229,7 +230,7 @@ public class BusConsumerTest extends TopicTestBase { } @Test - public void testDmaapConsumerWrapperClose() throws Exception { + public void testDmaapConsumerWrapperClose() { assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build()).close()).doesNotThrowAnyException(); } @@ -301,18 +302,18 @@ public class BusConsumerTest extends TopicTestBase { } @Test - public void testKafkaConsumerWrapper() throws Exception { + public void testKafkaConsumerWrapper() { // verify that different wrappers can be built assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException(); } @Test(expected = IllegalArgumentException.class) - public void testKafkaConsumerWrapper_InvalidTopic() throws Exception { + public void testKafkaConsumerWrapper_InvalidTopic() { new KafkaConsumerWrapper(makeBuilder().topic(null).build()); } - @Test(expected = java.lang.IllegalStateException.class) - public void testKafkaConsumerWrapperFetch() throws Exception { + @Test + public void testKafkaConsumerWrapperFetch() { //Setup Properties for consumer Properties kafkaProps = new Properties(); @@ -331,17 +332,17 @@ public class BusConsumerTest extends TopicTestBase { KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps); kafka.consumer = consumer; - assertFalse(kafka.fetch().iterator().hasNext()); + assertThrows(java.lang.IllegalStateException.class, () -> kafka.fetch().iterator().hasNext()); consumer.close(); } @Test - public void testKafkaConsumerWrapperClose() throws Exception { + public void testKafkaConsumerWrapperClose() { assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException(); } @Test - public void testKafkaConsumerWrapperToString() throws Exception { + public void testKafkaConsumerWrapperToString() { assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString()); } @@ -352,7 +353,7 @@ public class BusConsumerTest extends TopicTestBase { } @Override - public Iterable<String> fetch() throws IOException { + public Iterable<String> fetch() { return null; } } |