From 4390de5f7da75b09afdaa368a1ddcea2b1f0c43d Mon Sep 17 00:00:00 2001 From: Jim Hahn Date: Fri, 13 Apr 2018 16:34:35 -0400 Subject: Switch to cambria client 1.2.1-oss Switched to new version of cambria client. Added code to set cambria socket timeout. Modified code to use cambria client instead of dmaap client. Removed extra, unnecessary items that had been added to the top-level pom.xml Change-Id: If71d36f50da5423ec0cf21b30e66aff5b1c9222a Issue-ID: POLICY-742 Signed-off-by: Jim Hahn --- .../event/comm/bus/internal/BusConsumer.java | 32 +++++++++++++++------- .../event/comm/bus/internal/BusPublisher.java | 30 +++++++++++++------- .../comm/bus/internal/InlineDmaapTopicSink.java | 11 ++++---- .../internal/SingleThreadedDmaapTopicSource.java | 7 +++-- 4 files changed, 52 insertions(+), 28 deletions(-) (limited to 'policy-endpoints/src/main/java/org') diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java index db240b3d..70c37d55 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java @@ -122,28 +122,40 @@ public interface BusConsumer { public CambriaConsumerWrapper(List servers, String topic, String apiKey, String apiSecret, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) { + this(servers, topic, apiKey, apiSecret, null, null, + consumerGroup, consumerInstance, fetchTimeout, fetchLimit, + useHttps, useSelfSignedCerts); + } + + public CambriaConsumerWrapper(List servers, String topic, String apiKey, + String apiSecret, String username, String password, + String consumerGroup, String consumerInstance, int fetchTimeout, + int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) { this.fetchTimeout = fetchTimeout; this.builder = new CambriaClientBuilders.ConsumerBuilder(); + builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) + .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit); + + // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable) + builder.withSocketTimeout(fetchTimeout + 30000); + if (useHttps) { + builder.usingHttps(); if (useSelfSignedCerts) { - builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) - .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps() - .allowSelfSignedCertificates(); - } else { - builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) - .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps(); + builder.allowSelfSignedCertificates(); } - } else { - builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) - .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit); } if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { - builder.authenticatedBy(apiKey, apiSecret); + builder.authenticatedBy(apiKey, apiSecret); + } + + if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) { + builder.authenticatedByHttp(username, password); } try { diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java index 852c9c16..8e18bba8 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java @@ -74,18 +74,23 @@ public interface BusPublisher { public CambriaPublisherWrapper(List servers, String topic, String apiKey, String apiSecret, boolean useHttps) { + this(servers, topic, apiKey, apiSecret, null, null, useHttps); + } + + public CambriaPublisherWrapper(List servers, String topic, + String apiKey, String apiSecret, + String username, String password, + boolean useHttps) { + PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder(); - - if (useHttps){ - - builder.usingHosts(servers) - .onTopic(topic) - .usingHttps(); - } - else{ - builder.usingHosts(servers) - .onTopic(topic); + builder.usingHosts(servers).onTopic(topic); + + // Set read timeout to 30 seconds (TBD: this should be configurable) + builder.withSocketTimeout(30000); + + if (useHttps){ + builder.usingHttps(); } @@ -94,6 +99,11 @@ public interface BusPublisher { builder.authenticatedBy(apiKey, apiSecret); } + if (username != null && !username.isEmpty() && + password != null && !password.isEmpty()) { + builder.authenticatedByHttp(username, password); + } + try { this.publisher = builder.build(); } catch (MalformedURLException | GeneralSecurityException e) { diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java index 912607fd..718bb21d 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -114,10 +114,11 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop (this.longitude == null || this.longitude.isEmpty()) && (this.partner == null || this.partner.isEmpty())) { this.publisher = - new BusPublisher.DmaapAafPublisherWrapper(this.servers, + new BusPublisher.CambriaPublisherWrapper(this.servers, this.topic, - this.userName, - this.password, this.useHttps); + this.apiKey, this.apiSecret, + this.userName, this.password, + this.useHttps); } else { this.publisher = new BusPublisher.DmaapDmePublisherWrapper(this.servers, this.topic, @@ -148,4 +149,4 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop return builder.toString(); } -} \ No newline at end of file +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java index 6c1bc8a0..88d67fd2 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -162,11 +162,12 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource (this.longitude == null || this.longitude.isEmpty()) && (this.partner == null || this.partner.isEmpty())) { this.consumer = - new BusConsumer.DmaapAafConsumerWrapper(this.servers, this.topic, + new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance, - this.fetchTimeout, this.fetchLimit, this.useHttps); + this.fetchTimeout, this.fetchLimit, + this.useHttps, this.allowSelfSignedCerts); } else { this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic, -- cgit 1.2.3-korg