summaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java32
1 files changed, 22 insertions, 10 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
index db240b3d..70c37d55 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
@@ -122,28 +122,40 @@ public interface BusConsumer {
public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey,
String apiSecret, String consumerGroup, String consumerInstance, int fetchTimeout,
int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) {
+ this(servers, topic, apiKey, apiSecret, null, null,
+ consumerGroup, consumerInstance, fetchTimeout, fetchLimit,
+ useHttps, useSelfSignedCerts);
+ }
+
+ public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey,
+ String apiSecret, String username, String password,
+ String consumerGroup, String consumerInstance, int fetchTimeout,
+ int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) {
this.fetchTimeout = fetchTimeout;
this.builder = new CambriaClientBuilders.ConsumerBuilder();
+ builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
+ .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit);
+
+ // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
+ builder.withSocketTimeout(fetchTimeout + 30000);
+
if (useHttps) {
+ builder.usingHttps();
if (useSelfSignedCerts) {
- builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
- .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps()
- .allowSelfSignedCertificates();
- } else {
- builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
- .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps();
+ builder.allowSelfSignedCertificates();
}
- } else {
- builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
- .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit);
}
if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) {
- builder.authenticatedBy(apiKey, apiSecret);
+ builder.authenticatedBy(apiKey, apiSecret);
+ }
+
+ if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) {
+ builder.authenticatedByHttp(username, password);
}
try {