summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java57
1 files changed, 21 insertions, 36 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
index 8050de2a..e4064c5d 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
@@ -3,14 +3,14 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
+ * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -52,9 +52,9 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
/**
* Constructor.
- *
+ *
* @param busTopicParams Parameters object containing all the required inputs
- *
+ *
* @throws IllegalArgumentException An invalid parameter passed in
*/
public SingleThreadedDmaapTopicSource(BusTopicParams busTopicParams) {
@@ -75,7 +75,7 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
try {
this.init();
} catch (Exception e) {
- logger.error("ERROR during init in dmaap-source: cannot create topic {} because of {}",
+ logger.error("ERROR during init in dmaap-source: cannot create topic {} because of {}",
topic, e.getMessage(), e);
throw new IllegalArgumentException(e);
}
@@ -87,53 +87,38 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
*/
@Override
public void init() throws MalformedURLException {
+ BusTopicParams.TopicParamsBuilder builder = BusTopicParams.builder()
+ .servers(this.servers)
+ .topic(this.topic)
+ .apiKey(this.apiKey)
+ .apiSecret(this.apiSecret)
+ .consumerGroup(this.consumerGroup)
+ .consumerInstance(this.consumerInstance)
+ .fetchTimeout(this.fetchTimeout)
+ .fetchLimit(this.fetchLimit)
+ .useHttps(this.useHttps);
+
if (anyNullOrEmpty(this.userName, this.password)) {
- this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder()
- .servers(this.servers)
- .topic(this.topic)
- .apiKey(this.apiKey)
- .apiSecret(this.apiSecret)
- .consumerGroup(this.consumerGroup)
- .consumerInstance(this.consumerInstance)
- .fetchTimeout(this.fetchTimeout)
- .fetchLimit(this.fetchLimit)
- .useHttps(this.useHttps)
+ this.consumer = new BusConsumer.CambriaConsumerWrapper(builder
.allowSelfSignedCerts(this.allowSelfSignedCerts)
.build());
} else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
- this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder()
- .servers(this.servers)
- .topic(this.topic)
- .apiKey(this.apiKey)
- .apiSecret(this.apiSecret)
+ this.consumer = new BusConsumer.CambriaConsumerWrapper(builder
.userName(this.userName)
.password(this.password)
- .consumerGroup(this.consumerGroup)
- .consumerInstance(this.consumerInstance)
- .fetchTimeout(this.fetchTimeout)
- .fetchLimit(this.fetchLimit)
- .useHttps(this.useHttps)
.allowSelfSignedCerts(this.allowSelfSignedCerts)
.build());
} else {
- this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(BusTopicParams.builder()
- .servers(this.servers)
- .topic(this.topic)
- .apiKey(this.apiKey)
- .apiSecret(this.apiSecret)
+ this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(builder
.userName(this.userName)
.password(this.password)
- .consumerGroup(this.consumerGroup)
- .consumerInstance(this.consumerInstance)
- .fetchTimeout(this.fetchTimeout)
- .fetchLimit(this.fetchLimit)
.environment(this.environment)
.aftEnvironment(this.aftEnvironment)
.partner(this.partner)
.latitude(this.latitude)
.longitude(this.longitude)
.additionalProps(this.additionalProps)
- .useHttps(this.useHttps).build());
+ .build());
}
logger.info("{}: INITTED", this);