summaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/org/onap/aai/validation/config/TopicConfig.java14
-rw-r--r--src/main/java/org/onap/aai/validation/factory/DMaaPEventConsumerFactory.java40
-rw-r--r--src/main/java/org/onap/aai/validation/factory/DMaaPEventPublisherFactory.java33
-rw-r--r--src/main/java/org/onap/aai/validation/publisher/ValidationEventPublisher.java10
-rw-r--r--src/main/java/org/onap/aai/validation/services/EventPollingService.java9
5 files changed, 80 insertions, 26 deletions
diff --git a/src/main/java/org/onap/aai/validation/config/TopicConfig.java b/src/main/java/org/onap/aai/validation/config/TopicConfig.java
index 38d527e..f95a357 100644
--- a/src/main/java/org/onap/aai/validation/config/TopicConfig.java
+++ b/src/main/java/org/onap/aai/validation/config/TopicConfig.java
@@ -91,6 +91,7 @@ public class TopicConfig {
topicConfig.setConsumerGroup(getTopicProperties().getProperty(topicName + ".consumer.group"));
topicConfig.setConsumerId(getTopicProperties().getProperty(topicName + ".consumer.id"));
topicConfig.setTransportType(getTopicProperties().getProperty(topicName + ".transport.type"));
+ topicConfig.setProtocol(getTopicProperties().getProperty(topicName + ".protocol"));
topics.add(topicConfig);
}
}
@@ -203,10 +204,18 @@ public class TopicConfig {
this.transportType = transportType;
}
+ public String getProtocol() {
+ return protocol;
+ }
+
+ public void setProtocol(String protocol) {
+ this.protocol = protocol;
+ }
+
@Override
public int hashCode() {
return Objects.hash(this.consumerGroup, this.consumerId, this.host, this.username, this.name,
- this.partition, this.password, this.transportType);
+ this.partition, this.password, this.transportType, this.protocol);
}
@Override
@@ -227,6 +236,7 @@ public class TopicConfig {
.append(partition, rhs.partition)
.append(password, rhs.password)
.append(transportType, rhs.transportType)
+ .append(protocol, rhs.protocol)
.isEquals();
// @formatter:on
}
@@ -235,7 +245,7 @@ public class TopicConfig {
public String toString() {
return "Topic [name=" + name + ", host=" + host + ", username=" + username + ", password=" + password
+ ", partition=" + partition + ", consumerGroup=" + consumerGroup + ", consumerId=" + consumerId
- + ", transportType =" + transportType + "]";
+ + ", transportType=" + transportType + ", protocol=" + protocol + "]";
}
}
}
diff --git a/src/main/java/org/onap/aai/validation/factory/DMaaPEventConsumerFactory.java b/src/main/java/org/onap/aai/validation/factory/DMaaPEventConsumerFactory.java
new file mode 100644
index 0000000..d51c966
--- /dev/null
+++ b/src/main/java/org/onap/aai/validation/factory/DMaaPEventConsumerFactory.java
@@ -0,0 +1,40 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (c) 2018-2019 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.aai.validation.factory;
+
+import java.net.MalformedURLException;
+import org.onap.aai.event.client.DMaaPEventConsumer;
+
+public class DMaaPEventConsumerFactory {
+
+ public DMaaPEventConsumer createEventConsumer(String topicHost, String topicName, String topicUsername,
+ String topicPassword, String consumerGroup, String consumerId, String transportType, String protocol)
+ throws MalformedURLException {
+ return new DMaaPEventConsumer(topicHost, topicName, topicUsername, topicPassword, consumerGroup, consumerId,
+ DMaaPEventConsumer.DEFAULT_MESSAGE_WAIT_TIMEOUT, //
+ DMaaPEventConsumer.DEFAULT_MESSAGE_LIMIT,
+ transportType == null ? DMaaPEventConsumer.DEFAULT_TRANSPORT_TYPE : transportType,
+ protocol == null ? DMaaPEventConsumer.DEFAULT_PROTOCOL : protocol, //
+ null /* no filter */);
+ }
+
+}
diff --git a/src/main/java/org/onap/aai/validation/factory/DMaaPEventPublisherFactory.java b/src/main/java/org/onap/aai/validation/factory/DMaaPEventPublisherFactory.java
index fa4af74..2990e31 100644
--- a/src/main/java/org/onap/aai/validation/factory/DMaaPEventPublisherFactory.java
+++ b/src/main/java/org/onap/aai/validation/factory/DMaaPEventPublisherFactory.java
@@ -1,34 +1,39 @@
-/*
- * ============LICENSE_START===================================================
- * Copyright (c) 2018 Amdocs
- * ============================================================================
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (c) 2018-2019 European Software Marketing Ltd.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END=====================================================
+ * ============LICENSE_END=========================================================
*/
+
package org.onap.aai.validation.factory;
import org.onap.aai.event.client.DMaaPEventPublisher;
-public class DMaaPEventPublisherFactory {
-
+public class DMaaPEventPublisherFactory {
public DMaaPEventPublisher createEventPublisher(String topicHost, String topicName, String topicUsername,
- String topicPassword, String topicTransportType) {
- int defaultBatchSize = DMaaPEventPublisher.DEFAULT_BATCH_SIZE;
- long defaultBatchAge = DMaaPEventPublisher.DEFAULT_BATCH_AGE;
- int defaultBatchDelay = DMaaPEventPublisher.DEFAULT_BATCH_DELAY;
- return new DMaaPEventPublisher(topicHost, topicName, topicUsername, topicPassword, defaultBatchSize,
- defaultBatchAge, defaultBatchDelay, topicTransportType);
+ String topicPassword, String transportType, String protocol) {
+ return new DMaaPEventPublisher(topicHost, topicName, topicUsername, topicPassword,
+ DMaaPEventPublisher.DEFAULT_BATCH_SIZE, //
+ DMaaPEventPublisher.DEFAULT_BATCH_AGE, //
+ DMaaPEventPublisher.DEFAULT_BATCH_DELAY, //
+ transportType == null ? DMaaPEventPublisher.DEFAULT_TRANSPORT_TYPE : transportType,
+ protocol == null ? DMaaPEventPublisher.DEFAULT_PROTOCOL : protocol,
+ DMaaPEventPublisher.DEFAULT_CONTENT_TYPE);
}
}
diff --git a/src/main/java/org/onap/aai/validation/publisher/ValidationEventPublisher.java b/src/main/java/org/onap/aai/validation/publisher/ValidationEventPublisher.java
index c52ff10..4b0b583 100644
--- a/src/main/java/org/onap/aai/validation/publisher/ValidationEventPublisher.java
+++ b/src/main/java/org/onap/aai/validation/publisher/ValidationEventPublisher.java
@@ -2,8 +2,8 @@
* ============LICENSE_START=======================================================
* org.onap.aai
* ================================================================================
- * Copyright © 2018-2019 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2018-2019 European Software Marketing Ltd.
+ * Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (c) 2018-2019 European Software Marketing Ltd.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -85,9 +85,7 @@ public class ValidationEventPublisher implements MessagePublisher {
*/
@Override
public void publishMessages(Collection<String> messages) throws ValidationServiceException {
- if (!enablePublishing) {
- return;
- } else {
+ if (enablePublishing) {
applicationLogger.debug("Publishing messages: " + messages);
for (Topic topic : publisherTopics) {
retriesRemaining = retries;
@@ -99,7 +97,7 @@ public class ValidationEventPublisher implements MessagePublisher {
private void publishMessages(Collection<String> messages, Topic topic) throws ValidationServiceException {
DMaaPEventPublisher dMaapEventPublisher = dMaapFactory.createEventPublisher(topic.getHost(), topic.getName(),
- topic.getUsername(), topic.getPassword(), topic.getTransportType());
+ topic.getUsername(), topic.getPassword(), topic.getTransportType(), topic.getProtocol());
try {
// Add our message to the publisher's queue/bus
diff --git a/src/main/java/org/onap/aai/validation/services/EventPollingService.java b/src/main/java/org/onap/aai/validation/services/EventPollingService.java
index 4a85f57..bc0c260 100644
--- a/src/main/java/org/onap/aai/validation/services/EventPollingService.java
+++ b/src/main/java/org/onap/aai/validation/services/EventPollingService.java
@@ -33,6 +33,7 @@ import org.onap.aai.validation.config.TopicConfig.Topic;
import org.onap.aai.validation.controller.ValidationController;
import org.onap.aai.validation.exception.ValidationServiceError;
import org.onap.aai.validation.exception.ValidationServiceException;
+import org.onap.aai.validation.factory.DMaaPEventConsumerFactory;
import org.onap.aai.validation.logging.ApplicationMsgs;
import org.onap.aai.validation.logging.LogHelper;
import org.onap.aai.validation.logging.LogHelper.MdcParameter;
@@ -60,12 +61,12 @@ public class EventPollingService implements Runnable {
@Inject
public EventPollingService(TopicConfig topicConfig) throws ValidationServiceException {
consumers = new ArrayList<>();
+ DMaaPEventConsumerFactory factory = new DMaaPEventConsumerFactory();
for (Topic topic : topicConfig.getConsumerTopics()) {
try {
- consumers.add(new DMaaPEventConsumer(topic.getHost(), topic.getName(), topic.getUsername(),
- topic.getPassword(), topic.getConsumerGroup(), topic.getConsumerId(),
- DMaaPEventConsumer.DEFAULT_MESSAGE_WAIT_TIMEOUT, DMaaPEventConsumer.DEFAULT_MESSAGE_LIMIT,
- topic.getTransportType()));
+ consumers.add(factory.createEventConsumer(topic.getHost(), topic.getName(), topic.getUsername(),
+ topic.getPassword(), topic.getConsumerGroup(), topic.getConsumerId(), topic.getTransportType(),
+ topic.getProtocol()));
} catch (MalformedURLException e) {
throw new ValidationServiceException(ValidationServiceError.EVENT_CLIENT_CONSUMER_INIT_ERROR, e);
}