summaryrefslogtreecommitdiffstats
path: root/policy-endpoints
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java6
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java47
2 files changed, 23 insertions, 30 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
index ee41150f..8542d572 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
@@ -4,7 +4,7 @@
* ================================================================================
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
- * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
* Copyright (C) 2022 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -299,8 +299,8 @@ public interface BusConsumer {
try {
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
- for (ConsumerRecord<String, String> record : partitionRecords) {
- messages.add(record.value());
+ for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
+ messages.add(partitionRecord.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
index 6c6a9183..92f7bc6f 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
@@ -4,7 +4,7 @@
* ================================================================================
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
- * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
* Modifications Copyright (C) 2022-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -49,6 +49,8 @@ import org.slf4j.LoggerFactory;
public interface BusPublisher {
+ public static final String NO_MESSAGE_PROVIDED = "No message provided";
+
/**
* sends a message.
*
@@ -99,7 +101,6 @@ public interface BusPublisher {
}
}
-
if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
}
@@ -118,7 +119,7 @@ public interface BusPublisher {
@Override
public boolean send(String partitionId, String message) {
if (message == null) {
- throw new IllegalArgumentException("No message provided");
+ throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
}
try {
@@ -141,7 +142,6 @@ public interface BusPublisher {
}
}
-
@Override
public String toString() {
return "CambriaPublisherWrapper []";
@@ -178,7 +178,7 @@ public interface BusPublisher {
this.topic = busTopicParams.getTopic();
- //Setup Properties for consumer
+ // Setup Properties for consumer
kafkaProps = new Properties();
kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0));
if (busTopicParams.isAdditionalPropsValid()) {
@@ -199,15 +199,15 @@ public interface BusPublisher {
@Override
public boolean send(String partitionId, String message) {
if (message == null) {
- throw new IllegalArgumentException("No message provided");
+ throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
}
try {
- //Create the record
- ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,
- UUID.randomUUID().toString(), message);
+ // Create the record
+ ProducerRecord<String, String> producerRecord =
+ new ProducerRecord<>(topic, UUID.randomUUID().toString(), message);
- this.producer.send(record);
+ this.producer.send(producerRecord);
producer.flush();
} catch (Exception e) {
logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
@@ -227,7 +227,6 @@ public interface BusPublisher {
}
}
-
@Override
public String toString() {
return "KafkaPublisherWrapper []";
@@ -259,12 +258,10 @@ public interface BusPublisher {
protected DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
String username, String password, boolean useHttps) {
-
if (StringUtils.isBlank(topic)) {
throw new IllegalArgumentException("No topic for DMaaP");
}
-
configureProtocol(topic, protocol, servers, useHttps);
this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
@@ -291,7 +288,7 @@ public interface BusPublisher {
}
private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers,
- boolean useHttps) {
+ boolean useHttps) {
if (protocol == ProtocolTypeConstants.AAF_AUTH) {
if (servers == null || servers.isEmpty()) {
@@ -304,7 +301,6 @@ public interface BusPublisher {
dmaapServers.add(server + port);
}
-
this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
@@ -341,7 +337,7 @@ public interface BusPublisher {
@Override
public boolean send(String partitionId, String message) {
if (message == null) {
- throw new IllegalArgumentException("No message provided");
+ throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
}
this.publisher.setPubResponse(new MRPublisherResponse());
@@ -391,9 +387,8 @@ public interface BusPublisher {
busTopicParams.getUserName(), busTopicParams.getPassword(), busTopicParams.isUseHttps());
String dme2RouteOffer = busTopicParams.isAdditionalPropsValid()
- ? busTopicParams.getAdditionalProps().get(
- PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
- : null;
+ ? busTopicParams.getAdditionalProps().get(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
+ : null;
validateParams(busTopicParams, dme2RouteOffer);
@@ -454,14 +449,12 @@ public interface BusPublisher {
PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
}
- if ((busTopicParams.isPartnerInvalid())
- && StringUtils.isBlank(dme2RouteOffer)) {
- throw new IllegalArgumentException(
- "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
- + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
+ if ((busTopicParams.isPartnerInvalid()) && StringUtils.isBlank(dme2RouteOffer)) {
+ throw new IllegalArgumentException("Must provide at least "
+ + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + busTopicParams.getTopic()
+ + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
+ + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
+ + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
}
}