diff options
Diffstat (limited to 'src/main/java/org/onap')
5 files changed, 80 insertions, 26 deletions
diff --git a/src/main/java/org/onap/aai/validation/config/TopicConfig.java b/src/main/java/org/onap/aai/validation/config/TopicConfig.java index 38d527e..f95a357 100644 --- a/src/main/java/org/onap/aai/validation/config/TopicConfig.java +++ b/src/main/java/org/onap/aai/validation/config/TopicConfig.java @@ -91,6 +91,7 @@ public class TopicConfig { topicConfig.setConsumerGroup(getTopicProperties().getProperty(topicName + ".consumer.group")); topicConfig.setConsumerId(getTopicProperties().getProperty(topicName + ".consumer.id")); topicConfig.setTransportType(getTopicProperties().getProperty(topicName + ".transport.type")); + topicConfig.setProtocol(getTopicProperties().getProperty(topicName + ".protocol")); topics.add(topicConfig); } } @@ -203,10 +204,18 @@ public class TopicConfig { this.transportType = transportType; } + public String getProtocol() { + return protocol; + } + + public void setProtocol(String protocol) { + this.protocol = protocol; + } + @Override public int hashCode() { return Objects.hash(this.consumerGroup, this.consumerId, this.host, this.username, this.name, - this.partition, this.password, this.transportType); + this.partition, this.password, this.transportType, this.protocol); } @Override @@ -227,6 +236,7 @@ public class TopicConfig { .append(partition, rhs.partition) .append(password, rhs.password) .append(transportType, rhs.transportType) + .append(protocol, rhs.protocol) .isEquals(); // @formatter:on } @@ -235,7 +245,7 @@ public class TopicConfig { public String toString() { return "Topic [name=" + name + ", host=" + host + ", username=" + username + ", password=" + password + ", partition=" + partition + ", consumerGroup=" + consumerGroup + ", consumerId=" + consumerId - + ", transportType =" + transportType + "]"; + + ", transportType=" + transportType + ", protocol=" + protocol + "]"; } } } diff --git a/src/main/java/org/onap/aai/validation/factory/DMaaPEventConsumerFactory.java b/src/main/java/org/onap/aai/validation/factory/DMaaPEventConsumerFactory.java new file mode 100644 index 0000000..d51c966 --- /dev/null +++ b/src/main/java/org/onap/aai/validation/factory/DMaaPEventConsumerFactory.java @@ -0,0 +1,40 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (c) 2018-2019 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.aai.validation.factory; + +import java.net.MalformedURLException; +import org.onap.aai.event.client.DMaaPEventConsumer; + +public class DMaaPEventConsumerFactory { + + public DMaaPEventConsumer createEventConsumer(String topicHost, String topicName, String topicUsername, + String topicPassword, String consumerGroup, String consumerId, String transportType, String protocol) + throws MalformedURLException { + return new DMaaPEventConsumer(topicHost, topicName, topicUsername, topicPassword, consumerGroup, consumerId, + DMaaPEventConsumer.DEFAULT_MESSAGE_WAIT_TIMEOUT, // + DMaaPEventConsumer.DEFAULT_MESSAGE_LIMIT, + transportType == null ? DMaaPEventConsumer.DEFAULT_TRANSPORT_TYPE : transportType, + protocol == null ? DMaaPEventConsumer.DEFAULT_PROTOCOL : protocol, // + null /* no filter */); + } + +} diff --git a/src/main/java/org/onap/aai/validation/factory/DMaaPEventPublisherFactory.java b/src/main/java/org/onap/aai/validation/factory/DMaaPEventPublisherFactory.java index fa4af74..2990e31 100644 --- a/src/main/java/org/onap/aai/validation/factory/DMaaPEventPublisherFactory.java +++ b/src/main/java/org/onap/aai/validation/factory/DMaaPEventPublisherFactory.java @@ -1,34 +1,39 @@ -/* - * ============LICENSE_START=================================================== - * Copyright (c) 2018 Amdocs - * ============================================================================ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (c) 2018-2019 European Software Marketing Ltd. + * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END===================================================== + * ============LICENSE_END========================================================= */ + package org.onap.aai.validation.factory; import org.onap.aai.event.client.DMaaPEventPublisher; -public class DMaaPEventPublisherFactory { - +public class DMaaPEventPublisherFactory { public DMaaPEventPublisher createEventPublisher(String topicHost, String topicName, String topicUsername, - String topicPassword, String topicTransportType) { - int defaultBatchSize = DMaaPEventPublisher.DEFAULT_BATCH_SIZE; - long defaultBatchAge = DMaaPEventPublisher.DEFAULT_BATCH_AGE; - int defaultBatchDelay = DMaaPEventPublisher.DEFAULT_BATCH_DELAY; - return new DMaaPEventPublisher(topicHost, topicName, topicUsername, topicPassword, defaultBatchSize, - defaultBatchAge, defaultBatchDelay, topicTransportType); + String topicPassword, String transportType, String protocol) { + return new DMaaPEventPublisher(topicHost, topicName, topicUsername, topicPassword, + DMaaPEventPublisher.DEFAULT_BATCH_SIZE, // + DMaaPEventPublisher.DEFAULT_BATCH_AGE, // + DMaaPEventPublisher.DEFAULT_BATCH_DELAY, // + transportType == null ? DMaaPEventPublisher.DEFAULT_TRANSPORT_TYPE : transportType, + protocol == null ? DMaaPEventPublisher.DEFAULT_PROTOCOL : protocol, + DMaaPEventPublisher.DEFAULT_CONTENT_TYPE); } } diff --git a/src/main/java/org/onap/aai/validation/publisher/ValidationEventPublisher.java b/src/main/java/org/onap/aai/validation/publisher/ValidationEventPublisher.java index c52ff10..4b0b583 100644 --- a/src/main/java/org/onap/aai/validation/publisher/ValidationEventPublisher.java +++ b/src/main/java/org/onap/aai/validation/publisher/ValidationEventPublisher.java @@ -2,8 +2,8 @@ * ============LICENSE_START======================================================= * org.onap.aai * ================================================================================ - * Copyright © 2018-2019 AT&T Intellectual Property. All rights reserved. - * Copyright © 2018-2019 European Software Marketing Ltd. + * Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (c) 2018-2019 European Software Marketing Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -85,9 +85,7 @@ public class ValidationEventPublisher implements MessagePublisher { */ @Override public void publishMessages(Collection<String> messages) throws ValidationServiceException { - if (!enablePublishing) { - return; - } else { + if (enablePublishing) { applicationLogger.debug("Publishing messages: " + messages); for (Topic topic : publisherTopics) { retriesRemaining = retries; @@ -99,7 +97,7 @@ public class ValidationEventPublisher implements MessagePublisher { private void publishMessages(Collection<String> messages, Topic topic) throws ValidationServiceException { DMaaPEventPublisher dMaapEventPublisher = dMaapFactory.createEventPublisher(topic.getHost(), topic.getName(), - topic.getUsername(), topic.getPassword(), topic.getTransportType()); + topic.getUsername(), topic.getPassword(), topic.getTransportType(), topic.getProtocol()); try { // Add our message to the publisher's queue/bus diff --git a/src/main/java/org/onap/aai/validation/services/EventPollingService.java b/src/main/java/org/onap/aai/validation/services/EventPollingService.java index 4a85f57..bc0c260 100644 --- a/src/main/java/org/onap/aai/validation/services/EventPollingService.java +++ b/src/main/java/org/onap/aai/validation/services/EventPollingService.java @@ -33,6 +33,7 @@ import org.onap.aai.validation.config.TopicConfig.Topic; import org.onap.aai.validation.controller.ValidationController; import org.onap.aai.validation.exception.ValidationServiceError; import org.onap.aai.validation.exception.ValidationServiceException; +import org.onap.aai.validation.factory.DMaaPEventConsumerFactory; import org.onap.aai.validation.logging.ApplicationMsgs; import org.onap.aai.validation.logging.LogHelper; import org.onap.aai.validation.logging.LogHelper.MdcParameter; @@ -60,12 +61,12 @@ public class EventPollingService implements Runnable { @Inject public EventPollingService(TopicConfig topicConfig) throws ValidationServiceException { consumers = new ArrayList<>(); + DMaaPEventConsumerFactory factory = new DMaaPEventConsumerFactory(); for (Topic topic : topicConfig.getConsumerTopics()) { try { - consumers.add(new DMaaPEventConsumer(topic.getHost(), topic.getName(), topic.getUsername(), - topic.getPassword(), topic.getConsumerGroup(), topic.getConsumerId(), - DMaaPEventConsumer.DEFAULT_MESSAGE_WAIT_TIMEOUT, DMaaPEventConsumer.DEFAULT_MESSAGE_LIMIT, - topic.getTransportType())); + consumers.add(factory.createEventConsumer(topic.getHost(), topic.getName(), topic.getUsername(), + topic.getPassword(), topic.getConsumerGroup(), topic.getConsumerId(), topic.getTransportType(), + topic.getProtocol())); } catch (MalformedURLException e) { throw new ValidationServiceException(ValidationServiceError.EVENT_CLIENT_CONSUMER_INIT_ERROR, e); } |