summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/aai/validation/services/EventPollingService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/aai/validation/services/EventPollingService.java')
-rw-r--r--src/main/java/org/onap/aai/validation/services/EventPollingService.java9
1 files changed, 5 insertions, 4 deletions
diff --git a/src/main/java/org/onap/aai/validation/services/EventPollingService.java b/src/main/java/org/onap/aai/validation/services/EventPollingService.java
index 4a85f57..bc0c260 100644
--- a/src/main/java/org/onap/aai/validation/services/EventPollingService.java
+++ b/src/main/java/org/onap/aai/validation/services/EventPollingService.java
@@ -33,6 +33,7 @@ import org.onap.aai.validation.config.TopicConfig.Topic;
import org.onap.aai.validation.controller.ValidationController;
import org.onap.aai.validation.exception.ValidationServiceError;
import org.onap.aai.validation.exception.ValidationServiceException;
+import org.onap.aai.validation.factory.DMaaPEventConsumerFactory;
import org.onap.aai.validation.logging.ApplicationMsgs;
import org.onap.aai.validation.logging.LogHelper;
import org.onap.aai.validation.logging.LogHelper.MdcParameter;
@@ -60,12 +61,12 @@ public class EventPollingService implements Runnable {
@Inject
public EventPollingService(TopicConfig topicConfig) throws ValidationServiceException {
consumers = new ArrayList<>();
+ DMaaPEventConsumerFactory factory = new DMaaPEventConsumerFactory();
for (Topic topic : topicConfig.getConsumerTopics()) {
try {
- consumers.add(new DMaaPEventConsumer(topic.getHost(), topic.getName(), topic.getUsername(),
- topic.getPassword(), topic.getConsumerGroup(), topic.getConsumerId(),
- DMaaPEventConsumer.DEFAULT_MESSAGE_WAIT_TIMEOUT, DMaaPEventConsumer.DEFAULT_MESSAGE_LIMIT,
- topic.getTransportType()));
+ consumers.add(factory.createEventConsumer(topic.getHost(), topic.getName(), topic.getUsername(),
+ topic.getPassword(), topic.getConsumerGroup(), topic.getConsumerId(), topic.getTransportType(),
+ topic.getProtocol()));
} catch (MalformedURLException e) {
throw new ValidationServiceException(ValidationServiceError.EVENT_CLIENT_CONSUMER_INIT_ERROR, e);
}