From 563772ec9a100e9eb46fff1b5087985dbe07d04a Mon Sep 17 00:00:00 2001
From: "mark.j.leonard" <mark.j.leonard@gmail.com>
Date: Tue, 12 Feb 2019 16:49:30 +0000
Subject: Add a topic protocol configuration setting

Use the latest version of the event-client for DMaaP.

Read an (optional) protocol value from each topic configuration
properties file, and pass this to the Consumer and/or Publisher.
Use the default protocol if the configuration is not supplied.

Change-Id: I3d6264e1f32c1fbba097eafbe7fe7fbd744f1373
Issue-ID: AAI-2150
Signed-off-by: mark.j.leonard <mark.j.leonard@gmail.com>
---
 .../onap/aai/validation/config/TopicConfig.java    | 14 ++++++--
 .../factory/DMaaPEventConsumerFactory.java         | 40 ++++++++++++++++++++++
 .../factory/DMaaPEventPublisherFactory.java        | 33 ++++++++++--------
 .../publisher/ValidationEventPublisher.java        | 10 +++---
 .../validation/services/EventPollingService.java   |  9 ++---
 5 files changed, 80 insertions(+), 26 deletions(-)
 create mode 100644 src/main/java/org/onap/aai/validation/factory/DMaaPEventConsumerFactory.java

(limited to 'src/main/java')

diff --git a/src/main/java/org/onap/aai/validation/config/TopicConfig.java b/src/main/java/org/onap/aai/validation/config/TopicConfig.java
index 38d527e..f95a357 100644
--- a/src/main/java/org/onap/aai/validation/config/TopicConfig.java
+++ b/src/main/java/org/onap/aai/validation/config/TopicConfig.java
@@ -91,6 +91,7 @@ public class TopicConfig {
                 topicConfig.setConsumerGroup(getTopicProperties().getProperty(topicName + ".consumer.group"));
                 topicConfig.setConsumerId(getTopicProperties().getProperty(topicName + ".consumer.id"));
                 topicConfig.setTransportType(getTopicProperties().getProperty(topicName + ".transport.type"));
+                topicConfig.setProtocol(getTopicProperties().getProperty(topicName + ".protocol"));
                 topics.add(topicConfig);
             }
         }
@@ -203,10 +204,18 @@ public class TopicConfig {
             this.transportType = transportType;
         }
 
+        public String getProtocol() {
+            return protocol;
+        }
+
+        public void setProtocol(String protocol) {
+            this.protocol = protocol;
+        }
+
         @Override
         public int hashCode() {
             return Objects.hash(this.consumerGroup, this.consumerId, this.host, this.username, this.name,
-                    this.partition, this.password, this.transportType);
+                    this.partition, this.password, this.transportType, this.protocol);
         }
 
         @Override
@@ -227,6 +236,7 @@ public class TopicConfig {
 	                  .append(partition, rhs.partition)
 	                  .append(password, rhs.password)
 	                  .append(transportType, rhs.transportType)
+	                  .append(protocol, rhs.protocol)
 	                  .isEquals();
 	     // @formatter:on
         }
@@ -235,7 +245,7 @@ public class TopicConfig {
         public String toString() {
             return "Topic [name=" + name + ", host=" + host + ", username=" + username + ", password=" + password
                     + ", partition=" + partition + ", consumerGroup=" + consumerGroup + ", consumerId=" + consumerId
-                    + ", transportType =" + transportType + "]";
+                    + ", transportType=" + transportType + ", protocol=" + protocol + "]";
         }
     }
 }
diff --git a/src/main/java/org/onap/aai/validation/factory/DMaaPEventConsumerFactory.java b/src/main/java/org/onap/aai/validation/factory/DMaaPEventConsumerFactory.java
new file mode 100644
index 0000000..d51c966
--- /dev/null
+++ b/src/main/java/org/onap/aai/validation/factory/DMaaPEventConsumerFactory.java
@@ -0,0 +1,40 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (c) 2018-2019 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.aai.validation.factory;
+
+import java.net.MalformedURLException;
+import org.onap.aai.event.client.DMaaPEventConsumer;
+
+public class DMaaPEventConsumerFactory {
+
+    public DMaaPEventConsumer createEventConsumer(String topicHost, String topicName, String topicUsername,
+            String topicPassword, String consumerGroup, String consumerId, String transportType, String protocol)
+            throws MalformedURLException {
+        return new DMaaPEventConsumer(topicHost, topicName, topicUsername, topicPassword, consumerGroup, consumerId,
+                DMaaPEventConsumer.DEFAULT_MESSAGE_WAIT_TIMEOUT, //
+                DMaaPEventConsumer.DEFAULT_MESSAGE_LIMIT,
+                transportType == null ? DMaaPEventConsumer.DEFAULT_TRANSPORT_TYPE : transportType,
+                protocol == null ? DMaaPEventConsumer.DEFAULT_PROTOCOL : protocol, //
+                null /* no filter */);
+    }
+
+}
diff --git a/src/main/java/org/onap/aai/validation/factory/DMaaPEventPublisherFactory.java b/src/main/java/org/onap/aai/validation/factory/DMaaPEventPublisherFactory.java
index fa4af74..2990e31 100644
--- a/src/main/java/org/onap/aai/validation/factory/DMaaPEventPublisherFactory.java
+++ b/src/main/java/org/onap/aai/validation/factory/DMaaPEventPublisherFactory.java
@@ -1,34 +1,39 @@
-/*
- * ============LICENSE_START===================================================
- * Copyright (c) 2018 Amdocs
- * ============================================================================
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (c) 2018-2019 European Software Marketing Ltd.
+ * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
- *        http://www.apache.org/licenses/LICENSE-2.0
+ *       http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- * ============LICENSE_END=====================================================
+ * ============LICENSE_END=========================================================
  */
+
 package org.onap.aai.validation.factory;
 
 import org.onap.aai.event.client.DMaaPEventPublisher;
 
-public class DMaaPEventPublisherFactory {  
-
+public class DMaaPEventPublisherFactory {
 
     public DMaaPEventPublisher createEventPublisher(String topicHost, String topicName, String topicUsername,
-            String topicPassword, String topicTransportType) {
-        int defaultBatchSize = DMaaPEventPublisher.DEFAULT_BATCH_SIZE;
-        long defaultBatchAge = DMaaPEventPublisher.DEFAULT_BATCH_AGE;
-        int defaultBatchDelay = DMaaPEventPublisher.DEFAULT_BATCH_DELAY;
-        return new DMaaPEventPublisher(topicHost, topicName, topicUsername, topicPassword, defaultBatchSize,
-                defaultBatchAge, defaultBatchDelay, topicTransportType);
+            String topicPassword, String transportType, String protocol) {
+        return new DMaaPEventPublisher(topicHost, topicName, topicUsername, topicPassword,
+                DMaaPEventPublisher.DEFAULT_BATCH_SIZE, //
+                DMaaPEventPublisher.DEFAULT_BATCH_AGE, //
+                DMaaPEventPublisher.DEFAULT_BATCH_DELAY, //
+                transportType == null ? DMaaPEventPublisher.DEFAULT_TRANSPORT_TYPE : transportType,
+                protocol == null ? DMaaPEventPublisher.DEFAULT_PROTOCOL : protocol,
+                DMaaPEventPublisher.DEFAULT_CONTENT_TYPE);
     }
 
 }
diff --git a/src/main/java/org/onap/aai/validation/publisher/ValidationEventPublisher.java b/src/main/java/org/onap/aai/validation/publisher/ValidationEventPublisher.java
index c52ff10..4b0b583 100644
--- a/src/main/java/org/onap/aai/validation/publisher/ValidationEventPublisher.java
+++ b/src/main/java/org/onap/aai/validation/publisher/ValidationEventPublisher.java
@@ -2,8 +2,8 @@
  * ============LICENSE_START=======================================================
  * org.onap.aai
  * ================================================================================
- * Copyright © 2018-2019 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2018-2019 European Software Marketing Ltd.
+ * Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (c) 2018-2019 European Software Marketing Ltd.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -85,9 +85,7 @@ public class ValidationEventPublisher implements MessagePublisher {
      */
     @Override
     public void publishMessages(Collection<String> messages) throws ValidationServiceException {
-        if (!enablePublishing) {
-            return;
-        } else {
+        if (enablePublishing) {
             applicationLogger.debug("Publishing messages: " + messages);
             for (Topic topic : publisherTopics) {
                 retriesRemaining = retries;
@@ -99,7 +97,7 @@ public class ValidationEventPublisher implements MessagePublisher {
     private void publishMessages(Collection<String> messages, Topic topic) throws ValidationServiceException {
 
         DMaaPEventPublisher dMaapEventPublisher = dMaapFactory.createEventPublisher(topic.getHost(), topic.getName(),
-                topic.getUsername(), topic.getPassword(), topic.getTransportType());
+                topic.getUsername(), topic.getPassword(), topic.getTransportType(), topic.getProtocol());
 
         try {
             // Add our message to the publisher's queue/bus
diff --git a/src/main/java/org/onap/aai/validation/services/EventPollingService.java b/src/main/java/org/onap/aai/validation/services/EventPollingService.java
index 4a85f57..bc0c260 100644
--- a/src/main/java/org/onap/aai/validation/services/EventPollingService.java
+++ b/src/main/java/org/onap/aai/validation/services/EventPollingService.java
@@ -33,6 +33,7 @@ import org.onap.aai.validation.config.TopicConfig.Topic;
 import org.onap.aai.validation.controller.ValidationController;
 import org.onap.aai.validation.exception.ValidationServiceError;
 import org.onap.aai.validation.exception.ValidationServiceException;
+import org.onap.aai.validation.factory.DMaaPEventConsumerFactory;
 import org.onap.aai.validation.logging.ApplicationMsgs;
 import org.onap.aai.validation.logging.LogHelper;
 import org.onap.aai.validation.logging.LogHelper.MdcParameter;
@@ -60,12 +61,12 @@ public class EventPollingService implements Runnable {
     @Inject
     public EventPollingService(TopicConfig topicConfig) throws ValidationServiceException {
         consumers = new ArrayList<>();
+        DMaaPEventConsumerFactory factory = new DMaaPEventConsumerFactory();
         for (Topic topic : topicConfig.getConsumerTopics()) {
             try {
-                consumers.add(new DMaaPEventConsumer(topic.getHost(), topic.getName(), topic.getUsername(),
-                        topic.getPassword(), topic.getConsumerGroup(), topic.getConsumerId(),
-                        DMaaPEventConsumer.DEFAULT_MESSAGE_WAIT_TIMEOUT, DMaaPEventConsumer.DEFAULT_MESSAGE_LIMIT,
-                        topic.getTransportType()));
+                consumers.add(factory.createEventConsumer(topic.getHost(), topic.getName(), topic.getUsername(),
+                        topic.getPassword(), topic.getConsumerGroup(), topic.getConsumerId(), topic.getTransportType(),
+                        topic.getProtocol()));
             } catch (MalformedURLException e) {
                 throw new ValidationServiceException(ValidationServiceError.EVENT_CLIENT_CONSUMER_INIT_ERROR, e);
             }
-- 
cgit 1.2.3-korg