diff options
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java | 57 |
1 files changed, 21 insertions, 36 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java index 8050de2a..e4064c5d 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java @@ -3,14 +3,14 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. + * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -52,9 +52,9 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource /** * Constructor. - * + * * @param busTopicParams Parameters object containing all the required inputs - * + * * @throws IllegalArgumentException An invalid parameter passed in */ public SingleThreadedDmaapTopicSource(BusTopicParams busTopicParams) { @@ -75,7 +75,7 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource try { this.init(); } catch (Exception e) { - logger.error("ERROR during init in dmaap-source: cannot create topic {} because of {}", + logger.error("ERROR during init in dmaap-source: cannot create topic {} because of {}", topic, e.getMessage(), e); throw new IllegalArgumentException(e); } @@ -87,53 +87,38 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource */ @Override public void init() throws MalformedURLException { + BusTopicParams.TopicParamsBuilder builder = BusTopicParams.builder() + .servers(this.servers) + .topic(this.topic) + .apiKey(this.apiKey) + .apiSecret(this.apiSecret) + .consumerGroup(this.consumerGroup) + .consumerInstance(this.consumerInstance) + .fetchTimeout(this.fetchTimeout) + .fetchLimit(this.fetchLimit) + .useHttps(this.useHttps); + if (anyNullOrEmpty(this.userName, this.password)) { - this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder() - .servers(this.servers) - .topic(this.topic) - .apiKey(this.apiKey) - .apiSecret(this.apiSecret) - .consumerGroup(this.consumerGroup) - .consumerInstance(this.consumerInstance) - .fetchTimeout(this.fetchTimeout) - .fetchLimit(this.fetchLimit) - .useHttps(this.useHttps) + this.consumer = new BusConsumer.CambriaConsumerWrapper(builder .allowSelfSignedCerts(this.allowSelfSignedCerts) .build()); } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) { - this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder() - .servers(this.servers) - .topic(this.topic) - .apiKey(this.apiKey) - .apiSecret(this.apiSecret) + this.consumer = new BusConsumer.CambriaConsumerWrapper(builder .userName(this.userName) .password(this.password) - .consumerGroup(this.consumerGroup) - .consumerInstance(this.consumerInstance) - .fetchTimeout(this.fetchTimeout) - .fetchLimit(this.fetchLimit) - .useHttps(this.useHttps) .allowSelfSignedCerts(this.allowSelfSignedCerts) .build()); } else { - this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(BusTopicParams.builder() - .servers(this.servers) - .topic(this.topic) - .apiKey(this.apiKey) - .apiSecret(this.apiSecret) + this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(builder .userName(this.userName) .password(this.password) - .consumerGroup(this.consumerGroup) - .consumerInstance(this.consumerInstance) - .fetchTimeout(this.fetchTimeout) - .fetchLimit(this.fetchLimit) .environment(this.environment) .aftEnvironment(this.aftEnvironment) .partner(this.partner) .latitude(this.latitude) .longitude(this.longitude) .additionalProps(this.additionalProps) - .useHttps(this.useHttps).build()); + .build()); } logger.info("{}: INITTED", this); |