diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java | 32 |
1 files changed, 22 insertions, 10 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java index db240b3d..70c37d55 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java @@ -122,28 +122,40 @@ public interface BusConsumer { public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) { + this(servers, topic, apiKey, apiSecret, null, null, + consumerGroup, consumerInstance, fetchTimeout, fetchLimit, + useHttps, useSelfSignedCerts); + } + + public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, + String apiSecret, String username, String password, + String consumerGroup, String consumerInstance, int fetchTimeout, + int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) { this.fetchTimeout = fetchTimeout; this.builder = new CambriaClientBuilders.ConsumerBuilder(); + builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) + .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit); + + // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable) + builder.withSocketTimeout(fetchTimeout + 30000); + if (useHttps) { + builder.usingHttps(); if (useSelfSignedCerts) { - builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) - .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps() - .allowSelfSignedCertificates(); - } else { - builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) - .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps(); + builder.allowSelfSignedCertificates(); } - } else { - builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) - .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit); } if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { - builder.authenticatedBy(apiKey, apiSecret); + builder.authenticatedBy(apiKey, apiSecret); + } + + if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) { + builder.authenticatedByHttp(username, password); } try { |