diff options
Diffstat (limited to 'policy-endpoints')
2 files changed, 23 insertions, 30 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index ee41150f..8542d572 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -4,7 +4,7 @@ * ================================================================================ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. - * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved. * Copyright (C) 2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -299,8 +299,8 @@ public interface BusConsumer { try { for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); - for (ConsumerRecord<String, String> record : partitionRecords) { - messages.add(record.value()); + for (ConsumerRecord<String, String> partitionRecord : partitionRecords) { + messages.add(partitionRecord.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index 6c6a9183..92f7bc6f 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -4,7 +4,7 @@ * ================================================================================ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. - * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved. * Modifications Copyright (C) 2022-2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -49,6 +49,8 @@ import org.slf4j.LoggerFactory; public interface BusPublisher { + public static final String NO_MESSAGE_PROVIDED = "No message provided"; + /** * sends a message. * @@ -99,7 +101,6 @@ public interface BusPublisher { } } - if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) { builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret()); } @@ -118,7 +119,7 @@ public interface BusPublisher { @Override public boolean send(String partitionId, String message) { if (message == null) { - throw new IllegalArgumentException("No message provided"); + throw new IllegalArgumentException(NO_MESSAGE_PROVIDED); } try { @@ -141,7 +142,6 @@ public interface BusPublisher { } } - @Override public String toString() { return "CambriaPublisherWrapper []"; @@ -178,7 +178,7 @@ public interface BusPublisher { this.topic = busTopicParams.getTopic(); - //Setup Properties for consumer + // Setup Properties for consumer kafkaProps = new Properties(); kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0)); if (busTopicParams.isAdditionalPropsValid()) { @@ -199,15 +199,15 @@ public interface BusPublisher { @Override public boolean send(String partitionId, String message) { if (message == null) { - throw new IllegalArgumentException("No message provided"); + throw new IllegalArgumentException(NO_MESSAGE_PROVIDED); } try { - //Create the record - ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, - UUID.randomUUID().toString(), message); + // Create the record + ProducerRecord<String, String> producerRecord = + new ProducerRecord<>(topic, UUID.randomUUID().toString(), message); - this.producer.send(record); + this.producer.send(producerRecord); producer.flush(); } catch (Exception e) { logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e); @@ -227,7 +227,6 @@ public interface BusPublisher { } } - @Override public String toString() { return "KafkaPublisherWrapper []"; @@ -259,12 +258,10 @@ public interface BusPublisher { protected DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic, String username, String password, boolean useHttps) { - if (StringUtils.isBlank(topic)) { throw new IllegalArgumentException("No topic for DMaaP"); } - configureProtocol(topic, protocol, servers, useHttps); this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName())); @@ -291,7 +288,7 @@ public interface BusPublisher { } private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers, - boolean useHttps) { + boolean useHttps) { if (protocol == ProtocolTypeConstants.AAF_AUTH) { if (servers == null || servers.isEmpty()) { @@ -304,7 +301,6 @@ public interface BusPublisher { dmaapServers.add(server + port); } - this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build(); this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); @@ -341,7 +337,7 @@ public interface BusPublisher { @Override public boolean send(String partitionId, String message) { if (message == null) { - throw new IllegalArgumentException("No message provided"); + throw new IllegalArgumentException(NO_MESSAGE_PROVIDED); } this.publisher.setPubResponse(new MRPublisherResponse()); @@ -391,9 +387,8 @@ public interface BusPublisher { busTopicParams.getUserName(), busTopicParams.getPassword(), busTopicParams.isUseHttps()); String dme2RouteOffer = busTopicParams.isAdditionalPropsValid() - ? busTopicParams.getAdditionalProps().get( - PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY) - : null; + ? busTopicParams.getAdditionalProps().get(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY) + : null; validateParams(busTopicParams, dme2RouteOffer); @@ -454,14 +449,12 @@ public interface BusPublisher { PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); } - if ((busTopicParams.isPartnerInvalid()) - && StringUtils.isBlank(dme2RouteOffer)) { - throw new IllegalArgumentException( - "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " - + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); + if ((busTopicParams.isPartnerInvalid()) && StringUtils.isBlank(dme2RouteOffer)) { + throw new IllegalArgumentException("Must provide at least " + + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + busTopicParams.getTopic() + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic() + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); } } |