aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java90
1 files changed, 23 insertions, 67 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
index 92f7bc6f..ef8e1742 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
@@ -49,7 +49,9 @@ import org.slf4j.LoggerFactory;
public interface BusPublisher {
- public static final String NO_MESSAGE_PROVIDED = "No message provided";
+ String NO_MESSAGE_PROVIDED = "No message provided";
+ String LOG_CLOSE = "{}: CLOSE";
+ String LOG_CLOSE_FAILED = "{}: CLOSE FAILED";
/**
* sends a message.
@@ -59,19 +61,19 @@ public interface BusPublisher {
* @return true if success, false otherwise
* @throws IllegalArgumentException if no message provided
*/
- public boolean send(String partitionId, String message);
+ boolean send(String partitionId, String message);
/**
* closes the publisher.
*/
- public void close();
+ void close();
/**
* Cambria based library publisher.
*/
- public static class CambriaPublisherWrapper implements BusPublisher {
+ class CambriaPublisherWrapper implements BusPublisher {
- private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
+ private static final Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
/**
* The actual Cambria publisher.
@@ -133,7 +135,7 @@ public interface BusPublisher {
@Override
public void close() {
- logger.info("{}: CLOSE", this);
+ logger.info(LOG_CLOSE, this);
try {
this.publisher.close();
@@ -152,17 +154,17 @@ public interface BusPublisher {
/**
* Kafka based library publisher.
*/
- public static class KafkaPublisherWrapper implements BusPublisher {
+ class KafkaPublisherWrapper implements BusPublisher {
- private static Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
+ private static final Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
- private String topic;
+ private final String topic;
/**
* Kafka publisher.
*/
- private Producer<String, String> producer;
+ private final Producer<String, String> producer;
protected Properties kafkaProps;
/**
@@ -182,9 +184,7 @@ public interface BusPublisher {
kafkaProps = new Properties();
kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0));
if (busTopicParams.isAdditionalPropsValid()) {
- for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
- kafkaProps.put(entry.getKey(), entry.getValue());
- }
+ kafkaProps.putAll(busTopicParams.getAdditionalProps());
}
if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) {
kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
@@ -218,7 +218,7 @@ public interface BusPublisher {
@Override
public void close() {
- logger.info("{}: CLOSE", this);
+ logger.info(LOG_CLOSE, this);
try {
this.producer.close();
@@ -237,9 +237,9 @@ public interface BusPublisher {
/**
* DmaapClient library wrapper.
*/
- public abstract class DmaapPublisherWrapper implements BusPublisher {
+ abstract class DmaapPublisherWrapper implements BusPublisher {
- private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
+ private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
/**
* MR based Publisher.
@@ -320,17 +320,17 @@ public interface BusPublisher {
@Override
public void close() {
- logger.info("{}: CLOSE", this);
+ logger.info(LOG_CLOSE, this);
try {
this.publisher.close(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- logger.warn("{}: CLOSE FAILED", this, e);
+ logger.warn(LOG_CLOSE_FAILED, this, e);
Thread.currentThread().interrupt();
} catch (Exception e) {
- logger.warn("{}: CLOSE FAILED", this, e);
+ logger.warn(LOG_CLOSE_FAILED, this, e);
}
}
@@ -363,7 +363,7 @@ public interface BusPublisher {
/**
* DmaapClient library wrapper.
*/
- public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
+ class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
/**
* MR based Publisher.
*/
@@ -374,7 +374,7 @@ public interface BusPublisher {
}
}
- public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
+ class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
/**
* Constructor.
@@ -395,33 +395,10 @@ public interface BusPublisher {
String serviceName = busTopicParams.getServers().get(0);
/* These are required, no defaults */
- props.setProperty("Environment", busTopicParams.getEnvironment());
- props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
-
props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
- if (busTopicParams.getPartner() != null) {
- props.setProperty("Partner", busTopicParams.getPartner());
- }
- if (dme2RouteOffer != null) {
- props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
- }
-
- props.setProperty("Latitude", busTopicParams.getLatitude());
- props.setProperty("Longitude", busTopicParams.getLongitude());
-
- // ServiceName also a default, found in additionalProps
-
- /* These are optional, will default to these values if not set in optionalProps */
- props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
- props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
- props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
- props.setProperty("Version", "1.0");
- props.setProperty("SubContextPath", "/");
- props.setProperty("sessionstickinessrequired", "no");
+ BusHelper.setCommonProperties(busTopicParams, dme2RouteOffer, props);
- /* These should not change */
- props.setProperty("TransportType", "DME2");
props.setProperty("MethodType", "POST");
if (busTopicParams.isAdditionalPropsValid()) {
@@ -432,22 +409,7 @@ public interface BusPublisher {
}
private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) {
- if (busTopicParams.isEnvironmentInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
- }
- if (busTopicParams.isAftEnvironmentInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
- }
- if (busTopicParams.isLatitudeInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
- }
- if (busTopicParams.isLongitudeInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
- }
+ BusHelper.validateBusTopicParams(busTopicParams, PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
if ((busTopicParams.isPartnerInvalid()) && StringUtils.isBlank(dme2RouteOffer)) {
throw new IllegalArgumentException("Must provide at least "
@@ -468,11 +430,5 @@ public interface BusPublisher {
}
}
}
-
- private IllegalArgumentException parmException(String topic, String propnm) {
- return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
- + topic + propnm + " property for DME2 in DMaaP");
-
- }
}
}