diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java | 90 |
1 files changed, 23 insertions, 67 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index 92f7bc6f..ef8e1742 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -49,7 +49,9 @@ import org.slf4j.LoggerFactory; public interface BusPublisher { - public static final String NO_MESSAGE_PROVIDED = "No message provided"; + String NO_MESSAGE_PROVIDED = "No message provided"; + String LOG_CLOSE = "{}: CLOSE"; + String LOG_CLOSE_FAILED = "{}: CLOSE FAILED"; /** * sends a message. @@ -59,19 +61,19 @@ public interface BusPublisher { * @return true if success, false otherwise * @throws IllegalArgumentException if no message provided */ - public boolean send(String partitionId, String message); + boolean send(String partitionId, String message); /** * closes the publisher. */ - public void close(); + void close(); /** * Cambria based library publisher. */ - public static class CambriaPublisherWrapper implements BusPublisher { + class CambriaPublisherWrapper implements BusPublisher { - private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class); + private static final Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class); /** * The actual Cambria publisher. @@ -133,7 +135,7 @@ public interface BusPublisher { @Override public void close() { - logger.info("{}: CLOSE", this); + logger.info(LOG_CLOSE, this); try { this.publisher.close(); @@ -152,17 +154,17 @@ public interface BusPublisher { /** * Kafka based library publisher. */ - public static class KafkaPublisherWrapper implements BusPublisher { + class KafkaPublisherWrapper implements BusPublisher { - private static Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class); + private static final Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class); private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; - private String topic; + private final String topic; /** * Kafka publisher. */ - private Producer<String, String> producer; + private final Producer<String, String> producer; protected Properties kafkaProps; /** @@ -182,9 +184,7 @@ public interface BusPublisher { kafkaProps = new Properties(); kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0)); if (busTopicParams.isAdditionalPropsValid()) { - for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) { - kafkaProps.put(entry.getKey(), entry.getValue()); - } + kafkaProps.putAll(busTopicParams.getAdditionalProps()); } if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) { kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER); @@ -218,7 +218,7 @@ public interface BusPublisher { @Override public void close() { - logger.info("{}: CLOSE", this); + logger.info(LOG_CLOSE, this); try { this.producer.close(); @@ -237,9 +237,9 @@ public interface BusPublisher { /** * DmaapClient library wrapper. */ - public abstract class DmaapPublisherWrapper implements BusPublisher { + abstract class DmaapPublisherWrapper implements BusPublisher { - private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class); + private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class); /** * MR based Publisher. @@ -320,17 +320,17 @@ public interface BusPublisher { @Override public void close() { - logger.info("{}: CLOSE", this); + logger.info(LOG_CLOSE, this); try { this.publisher.close(1, TimeUnit.SECONDS); } catch (InterruptedException e) { - logger.warn("{}: CLOSE FAILED", this, e); + logger.warn(LOG_CLOSE_FAILED, this, e); Thread.currentThread().interrupt(); } catch (Exception e) { - logger.warn("{}: CLOSE FAILED", this, e); + logger.warn(LOG_CLOSE_FAILED, this, e); } } @@ -363,7 +363,7 @@ public interface BusPublisher { /** * DmaapClient library wrapper. */ - public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper { + class DmaapAafPublisherWrapper extends DmaapPublisherWrapper { /** * MR based Publisher. */ @@ -374,7 +374,7 @@ public interface BusPublisher { } } - public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper { + class DmaapDmePublisherWrapper extends DmaapPublisherWrapper { /** * Constructor. @@ -395,33 +395,10 @@ public interface BusPublisher { String serviceName = busTopicParams.getServers().get(0); /* These are required, no defaults */ - props.setProperty("Environment", busTopicParams.getEnvironment()); - props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment()); - props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName); - if (busTopicParams.getPartner() != null) { - props.setProperty("Partner", busTopicParams.getPartner()); - } - if (dme2RouteOffer != null) { - props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - } - - props.setProperty("Latitude", busTopicParams.getLatitude()); - props.setProperty("Longitude", busTopicParams.getLongitude()); - - // ServiceName also a default, found in additionalProps - - /* These are optional, will default to these values if not set in optionalProps */ - props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); - props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); - props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); - props.setProperty("Version", "1.0"); - props.setProperty("SubContextPath", "/"); - props.setProperty("sessionstickinessrequired", "no"); + BusHelper.setCommonProperties(busTopicParams, dme2RouteOffer, props); - /* These should not change */ - props.setProperty("TransportType", "DME2"); props.setProperty("MethodType", "POST"); if (busTopicParams.isAdditionalPropsValid()) { @@ -432,22 +409,7 @@ public interface BusPublisher { } private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) { - if (busTopicParams.isEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isAftEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isLatitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - } - if (busTopicParams.isLongitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - } + BusHelper.validateBusTopicParams(busTopicParams, PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS); if ((busTopicParams.isPartnerInvalid()) && StringUtils.isBlank(dme2RouteOffer)) { throw new IllegalArgumentException("Must provide at least " @@ -468,11 +430,5 @@ public interface BusPublisher { } } } - - private IllegalArgumentException parmException(String topic, String propnm) { - return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + propnm + " property for DME2 in DMaaP"); - - } } } |