summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java54
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java19
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java257
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java78
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java23
5 files changed, 307 insertions, 124 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
index 96ab6c63..4285b3a9 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedBusTopicSource;
import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.slf4j.Logger;
@@ -76,12 +77,12 @@ public interface DmaapTopicSourceFactory {
* @throws IllegalArgumentException if invalid parameters are present
*/
public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
- boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
+ String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
+ boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
/**
* Instantiates a new DMAAP Topic Source
- *
+ *
* @param servers list of servers
* @param topic topic name
* @param apiKey API Key
@@ -101,14 +102,14 @@ public interface DmaapTopicSourceFactory {
* @param managed is this endpoind managed?
* @param useHttps does the connection use HTTPS?
* @param allowSelfSignedCerts does connection allow self-signed certificates?
- *
+ *
* @return an DMAAP Topic Source
* @throws IllegalArgumentException if invalid parameters are present
*/
public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
- String environment, String aftEnvironment, String partner, String latitude, String longitude,
- Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
+ String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
+ String environment, String aftEnvironment, String partner, String latitude, String longitude,
+ Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
/**
* Instantiates a new DMAAP Topic Source
@@ -203,9 +204,26 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
return dmaapTopicSources.get(topic);
}
- DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(servers, topic, apiKey, apiSecret,
- userName, password, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, environment,
- aftEnvironment, partner, latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts);
+ DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder()
+ .servers(servers)
+ .topic(topic)
+ .apiKey(apiKey)
+ .apiSecret(apiSecret)
+ .userName(userName)
+ .password(password)
+ .consumerGroup(consumerGroup)
+ .consumerInstance(consumerInstance)
+ .fetchTimeout(fetchTimeout)
+ .fetchLimit(fetchLimit)
+ .environment(environment)
+ .aftEnvironment(aftEnvironment)
+ .partner(partner)
+ .latitude(latitude)
+ .longitude(longitude)
+ .additionalProps(additionalProps)
+ .useHttps(useHttps)
+ .allowSelfSignedCerts(allowSelfSignedCerts)
+ .build());
if (managed) {
dmaapTopicSources.put(topic, dmaapTopicSource);
@@ -237,8 +255,20 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
}
DmaapTopicSource dmaapTopicSource =
- new SingleThreadedDmaapTopicSource(servers, topic, apiKey, apiSecret, userName, password,
- consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
+ new SingleThreadedDmaapTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder()
+ .servers(servers)
+ .topic(topic)
+ .apiKey(apiKey)
+ .apiSecret(apiSecret)
+ .userName(userName)
+ .password(password)
+ .consumerGroup(consumerGroup)
+ .consumerInstance(consumerInstance)
+ .fetchTimeout(fetchTimeout)
+ .fetchLimit(fetchLimit)
+ .useHttps(useHttps)
+ .allowSelfSignedCerts(allowSelfSignedCerts)
+ .build());
if (managed) {
dmaapTopicSources.put(topic, dmaapTopicSource);
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java
index c6cf3095..4c3cbbf8 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Properties;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedBusTopicSource;
import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.slf4j.Logger;
@@ -63,8 +64,8 @@ public interface UebTopicSourceFactory {
* @throws IllegalArgumentException if invalid parameters are present
*/
public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret,
- String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean managed,
- boolean useHttps, boolean allowSelfSignedCerts);
+ String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean managed,
+ boolean useHttps, boolean allowSelfSignedCerts);
/**
* Instantiates a new UEB Topic Source
@@ -160,8 +161,18 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
return uebTopicSources.get(topic);
}
- UebTopicSource uebTopicSource = new SingleThreadedUebTopicSource(servers, topic, apiKey, apiSecret,
- consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
+ UebTopicSource uebTopicSource = new SingleThreadedUebTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder()
+ .servers(servers)
+ .topic(topic)
+ .apiKey(apiKey)
+ .apiSecret(apiSecret)
+ .consumerGroup(consumerGroup)
+ .consumerInstance(consumerInstance)
+ .fetchTimeout(fetchTimeout)
+ .fetchLimit(fetchLimit)
+ .useHttps(useHttps)
+ .allowSelfSignedCerts(allowSelfSignedCerts)
+ .build());
if (managed) {
uebTopicSources.put(topic, uebTopicSource);
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
index 97ebbd50..74912cae 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
@@ -22,6 +22,7 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal;
import java.net.MalformedURLException;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
@@ -79,46 +80,35 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
/**
*
- * @param servers Bus servers
- * @param topic Bus Topic to be monitored
- * @param apiKey Bus API Key (optional)
- * @param apiSecret Bus API Secret (optional)
- * @param consumerGroup Bus Reader Consumer Group
- * @param consumerInstance Bus Reader Instance
- * @param fetchTimeout Bus fetch timeout
- * @param fetchLimit Bus fetch limit
- * @param useHttps does the bus use https
- * @param allowSelfSignedCerts are self-signed certificates allowed
- * @throws IllegalArgumentException An invalid parameter passed in
+ *
+ * @param busTopicParams@throws IllegalArgumentException An invalid parameter passed in
*/
- public SingleThreadedBusTopicSource(List<String> servers, String topic, String apiKey, String apiSecret,
- String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps,
- boolean allowSelfSignedCerts) {
+ public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
- super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
+ super(busTopicParams.getServers(), busTopicParams.getTopic(), busTopicParams.getApiKey(), busTopicParams.getApiSecret(), busTopicParams.isUseHttps(), busTopicParams.isAllowSelfSignedCerts());
- if (consumerGroup == null || consumerGroup.isEmpty()) {
+ if (busTopicParams.getConsumerGroup() == null || busTopicParams.getConsumerGroup().isEmpty()) {
this.consumerGroup = UUID.randomUUID().toString();
} else {
- this.consumerGroup = consumerGroup;
+ this.consumerGroup = busTopicParams.getConsumerGroup();
}
- if (consumerInstance == null || consumerInstance.isEmpty()) {
+ if (busTopicParams.getConsumerInstance() == null || busTopicParams.getConsumerInstance().isEmpty()) {
this.consumerInstance = NetworkUtil.getHostname();
} else {
- this.consumerInstance = consumerInstance;
+ this.consumerInstance = busTopicParams.getConsumerInstance();
}
- if (fetchTimeout <= 0) {
+ if (busTopicParams.getFetchTimeout() <= 0) {
this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
} else {
- this.fetchTimeout = fetchTimeout;
+ this.fetchTimeout = busTopicParams.getFetchTimeout();
}
- if (fetchLimit <= 0) {
+ if (busTopicParams.getFetchLimit() <= 0) {
this.fetchLimit = NO_LIMIT_FETCH;
} else {
- this.fetchLimit = fetchLimit;
+ this.fetchLimit = busTopicParams.getFetchLimit();
}
}
@@ -322,4 +312,225 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
return fetchLimit;
}
+ /**
+ * Member variables of this Params class are as follows
+ * servers DMaaP servers
+ * topic DMaaP Topic to be monitored
+ * apiKey DMaaP API Key (optional)
+ * apiSecret DMaaP API Secret (optional)
+ * consumerGroup DMaaP Reader Consumer Group
+ * consumerInstance DMaaP Reader Instance
+ * fetchTimeout DMaaP fetch timeout
+ * fetchLimit DMaaP fetch limit
+ * environment DME2 Environment
+ * aftEnvironment DME2 AFT Environment
+ * partner DME2 Partner
+ * latitude DME2 Latitude
+ * longitude DME2 Longitude
+ * additionalProps Additional properties to pass to DME2
+ * useHttps does connection use HTTPS?
+ * allowSelfSignedCerts are self-signed certificates allow
+ *
+ */
+ public static class BusTopicParams {
+
+ public static TopicParamsBuilder builder() {
+ return new TopicParamsBuilder();
+ }
+ private List<String> servers;
+ private String topic;
+ private String apiKey;
+ private String apiSecret;
+ private String consumerGroup;
+ private String consumerInstance;
+ private int fetchTimeout;
+ private int fetchLimit;
+ private boolean useHttps;
+ private boolean allowSelfSignedCerts;
+
+ private String userName;
+ private String password;
+ private String environment;
+ private String aftEnvironment;
+ private String partner;
+ private String latitude;
+ private String longitude;
+ private Map<String, String> additionalProps;
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public String getEnvironment() {
+ return environment;
+ }
+
+ public String getAftEnvironment() {
+ return aftEnvironment;
+ }
+
+ public String getPartner() {
+ return partner;
+ }
+
+ public String getLatitude() {
+ return latitude;
+ }
+
+ public String getLongitude() {
+ return longitude;
+ }
+
+ public Map<String, String> getAdditionalProps() {
+ return additionalProps;
+ }
+
+ public List<String> getServers() {
+ return servers;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public String getApiKey() {
+ return apiKey;
+ }
+
+ public String getApiSecret() {
+ return apiSecret;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public String getConsumerInstance() {
+ return consumerInstance;
+ }
+
+ public int getFetchTimeout() {
+ return fetchTimeout;
+ }
+
+ public int getFetchLimit() {
+ return fetchLimit;
+ }
+
+ public boolean isUseHttps() {
+ return useHttps;
+ }
+
+ public boolean isAllowSelfSignedCerts() {
+ return allowSelfSignedCerts;
+ }
+
+
+ public static class TopicParamsBuilder {
+ BusTopicParams m = new BusTopicParams();
+
+ private TopicParamsBuilder() {
+ }
+
+ public TopicParamsBuilder servers(List<String> servers) {
+ this.m.servers = servers;
+ return this;
+ }
+
+ public TopicParamsBuilder topic(String topic) {
+ this.m.topic = topic;
+ return this;
+ }
+
+ public TopicParamsBuilder apiKey(String apiKey) {
+ this.m.apiKey = apiKey;
+ return this;
+ }
+
+ public TopicParamsBuilder apiSecret(String apiSecret) {
+ this.m.apiSecret = apiSecret;
+ return this;
+ }
+
+ public TopicParamsBuilder consumerGroup(String consumerGroup) {
+ this.m.consumerGroup = consumerGroup;
+ return this;
+ }
+
+ public TopicParamsBuilder consumerInstance(String consumerInstance) {
+ this.m.consumerInstance = consumerInstance;
+ return this;
+ }
+
+ public TopicParamsBuilder fetchTimeout(int fetchTimeout) {
+ this.m.fetchTimeout = fetchTimeout;
+ return this;
+ }
+
+ public TopicParamsBuilder fetchLimit(int fetchLimit) {
+ this.m.fetchLimit = fetchLimit;
+ return this;
+ }
+
+ public TopicParamsBuilder useHttps(boolean useHttps) {
+ this.m.useHttps = useHttps;
+ return this;
+ }
+
+ public TopicParamsBuilder allowSelfSignedCerts(boolean allowSelfSignedCerts) {
+ this.m.allowSelfSignedCerts = allowSelfSignedCerts;
+ return this;
+ }
+
+ public TopicParamsBuilder userName(String userName) {
+ this.m.userName = userName;
+ return this;
+ }
+
+ public TopicParamsBuilder password(String password) {
+ this.m.password = password;
+ return this;
+ }
+
+ public TopicParamsBuilder environment(String environment) {
+ this.m.environment = environment;
+ return this;
+ }
+
+ public TopicParamsBuilder aftEnvironment(String aftEnvironment) {
+ this.m.aftEnvironment = aftEnvironment;
+ return this;
+ }
+
+ public TopicParamsBuilder partner(String partner) {
+ this.m.partner = partner;
+ return this;
+ }
+
+ public TopicParamsBuilder latitude(String latitude) {
+ this.m.latitude = latitude;
+ return this;
+ }
+
+ public TopicParamsBuilder longitude(String longitude) {
+ this.m.longitude = longitude;
+ return this;
+ }
+
+ public TopicParamsBuilder additionalProps(Map<String, String> additionalProps) {
+ this.m.additionalProps = additionalProps;
+ return this;
+ }
+
+ public BusTopicParams build() {
+ return m;
+ }
+
+ }
+
+ }
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
index 8ac41424..c6bd5568 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
@@ -21,7 +21,6 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
import java.net.MalformedURLException;
-import java.util.List;
import java.util.Map;
import org.onap.policy.common.endpoints.event.comm.Topic;
@@ -52,81 +51,28 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
/**
*
- * @param servers DMaaP servers
- * @param topic DMaaP Topic to be monitored
- * @param apiKey DMaaP API Key (optional)
- * @param apiSecret DMaaP API Secret (optional)
- * @param consumerGroup DMaaP Reader Consumer Group
- * @param consumerInstance DMaaP Reader Instance
- * @param fetchTimeout DMaaP fetch timeout
- * @param fetchLimit DMaaP fetch limit
- * @param environment DME2 Environment
- * @param aftEnvironment DME2 AFT Environment
- * @param partner DME2 Partner
- * @param latitude DME2 Latitude
- * @param longitude DME2 Longitude
- * @param additionalProps Additional properties to pass to DME2
- * @param useHttps does connection use HTTPS?
- * @param allowSelfSignedCerts are self-signed certificates allow
- *
+ * @param busTopicParams Parameters object containing all the required inputs *
* @throws IllegalArgumentException An invalid parameter passed in
*/
- public SingleThreadedDmaapTopicSource(List<String> servers, String topic, String apiKey, String apiSecret,
- String userName, String password, String consumerGroup, String consumerInstance, int fetchTimeout,
- int fetchLimit, String environment, String aftEnvironment, String partner, String latitude,
- String longitude, Map<String, String> additionalProps, boolean useHttps, boolean allowSelfSignedCerts) {
-
- super(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps,
- allowSelfSignedCerts);
-
- this.userName = userName;
- this.password = password;
-
- this.environment = environment;
- this.aftEnvironment = aftEnvironment;
- this.partner = partner;
-
- this.latitude = latitude;
- this.longitude = longitude;
-
- this.additionalProps = additionalProps;
- try {
- this.init();
- } catch (Exception e) {
- logger.error("ERROR during init of topic {}", this.topic);
- throw new IllegalArgumentException(e);
- }
- }
+ public SingleThreadedDmaapTopicSource(BusTopicParams busTopicParams) {
- /**
- *
- * @param servers DMaaP servers
- * @param topic DMaaP Topic to be monitored
- * @param apiKey DMaaP API Key (optional)
- * @param apiSecret DMaaP API Secret (optional)
- * @param consumerGroup DMaaP Reader Consumer Group
- * @param consumerInstance DMaaP Reader Instance
- * @param fetchTimeout DMaaP fetch timeout
- * @param fetchLimit DMaaP fetch limit
- * @param useHttps does connection use HTTPS?
- * @param allowSelfSignedCerts are self-signed certificates allow
- * @throws IllegalArgumentException An invalid parameter passed in
- */
- public SingleThreadedDmaapTopicSource(List<String> servers, String topic, String apiKey, String apiSecret,
- String userName, String password, String consumerGroup, String consumerInstance, int fetchTimeout,
- int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) {
+ super(busTopicParams);
+ this.userName = busTopicParams.getUserName();
+ this.password = busTopicParams.getPassword();
- super(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps,
- allowSelfSignedCerts);
+ this.environment = busTopicParams.getEnvironment();
+ this.aftEnvironment = busTopicParams.getAftEnvironment();
+ this.partner = busTopicParams.getPartner();
- this.userName = userName;
- this.password = password;
+ this.latitude = busTopicParams.getLatitude();
+ this.longitude = busTopicParams.getLongitude();
+ this.additionalProps = busTopicParams.getAdditionalProps();
try {
this.init();
} catch (Exception e) {
- logger.warn("dmaap-source: cannot create topic {} because of {}", topic, e.getMessage(), e);
+ logger.error("ERROR during init in dmaap-source: cannot create topic {} because of {}", topic, e.getMessage(), e);
throw new IllegalArgumentException(e);
}
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java
index b7a20503..03273a2b 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java
@@ -20,8 +20,6 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
-import java.util.List;
-
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource;
@@ -33,29 +31,16 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource i
/**
*
- * @param servers UEB servers
- * @param topic UEB Topic to be monitored
- * @param apiKey UEB API Key (optional)
- * @param apiSecret UEB API Secret (optional)
- * @param consumerGroup UEB Reader Consumer Group
- * @param consumerInstance UEB Reader Instance
- * @param fetchTimeout UEB fetch timeout
- * @param fetchLimit UEB fetch limit
- * @param useHttps does topicSource use HTTPS?
- * @param allowSelfSignedCerts does topicSource allow self-signed certs?
- *
+ * @param busTopicParams Parameters object containing all the required inputs *
* @throws IllegalArgumentException An invalid parameter passed in
*/
- public SingleThreadedUebTopicSource(List<String> servers, String topic, String apiKey, String apiSecret,
- String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps,
- boolean allowSelfSignedCerts) {
+ public SingleThreadedUebTopicSource(BusTopicParams busTopicParams) {
- super(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps,
- allowSelfSignedCerts);
+ super(busTopicParams);
- this.allowSelfSignedCerts = allowSelfSignedCerts;
+ this.allowSelfSignedCerts = busTopicParams.isAllowSelfSignedCerts();
this.init();
}