diff options
author | Jim Hahn <jrh3@att.com> | 2018-04-13 16:34:35 -0400 |
---|---|---|
committer | Jim Hahn <jrh3@att.com> | 2018-04-16 08:50:18 -0400 |
commit | 4390de5f7da75b09afdaa368a1ddcea2b1f0c43d (patch) | |
tree | ebe3f5c7b344f24e352df72dc426d3fd37945c3a /policy-endpoints | |
parent | 3fd9dc0e5c584702d25982172bb5ee44b6b57aa3 (diff) |
Switch to cambria client 1.2.1-oss
Switched to new version of cambria client.
Added code to set cambria socket timeout.
Modified code to use cambria client instead of dmaap client.
Removed extra, unnecessary items that had been added to
the top-level pom.xml
Change-Id: If71d36f50da5423ec0cf21b30e66aff5b1c9222a
Issue-ID: POLICY-742
Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'policy-endpoints')
5 files changed, 53 insertions, 33 deletions
diff --git a/policy-endpoints/pom.xml b/policy-endpoints/pom.xml index 9f8cabc9..8664646c 100644 --- a/policy-endpoints/pom.xml +++ b/policy-endpoints/pom.xml @@ -2,7 +2,7 @@ ============LICENSE_START======================================================= ONAP Policy Engine - Drools PDP ================================================================================ - Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. ================================================================================ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -51,10 +51,6 @@ <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> - <exclusion> - <groupId>com.att.nsa</groupId> - <artifactId>saClientLibrary</artifactId> - </exclusion> </exclusions> </dependency> diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java index db240b3d..70c37d55 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java @@ -122,28 +122,40 @@ public interface BusConsumer { public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) { + this(servers, topic, apiKey, apiSecret, null, null, + consumerGroup, consumerInstance, fetchTimeout, fetchLimit, + useHttps, useSelfSignedCerts); + } + + public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, + String apiSecret, String username, String password, + String consumerGroup, String consumerInstance, int fetchTimeout, + int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) { this.fetchTimeout = fetchTimeout; this.builder = new CambriaClientBuilders.ConsumerBuilder(); + builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) + .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit); + + // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable) + builder.withSocketTimeout(fetchTimeout + 30000); + if (useHttps) { + builder.usingHttps(); if (useSelfSignedCerts) { - builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) - .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps() - .allowSelfSignedCertificates(); - } else { - builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) - .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps(); + builder.allowSelfSignedCertificates(); } - } else { - builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) - .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit); } if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { - builder.authenticatedBy(apiKey, apiSecret); + builder.authenticatedBy(apiKey, apiSecret); + } + + if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) { + builder.authenticatedByHttp(username, password); } try { diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java index 852c9c16..8e18bba8 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java @@ -74,18 +74,23 @@ public interface BusPublisher { public CambriaPublisherWrapper(List<String> servers, String topic, String apiKey, String apiSecret, boolean useHttps) { + this(servers, topic, apiKey, apiSecret, null, null, useHttps); + } + + public CambriaPublisherWrapper(List<String> servers, String topic, + String apiKey, String apiSecret, + String username, String password, + boolean useHttps) { + PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder(); - - if (useHttps){ - - builder.usingHosts(servers) - .onTopic(topic) - .usingHttps(); - } - else{ - builder.usingHosts(servers) - .onTopic(topic); + builder.usingHosts(servers).onTopic(topic); + + // Set read timeout to 30 seconds (TBD: this should be configurable) + builder.withSocketTimeout(30000); + + if (useHttps){ + builder.usingHttps(); } @@ -94,6 +99,11 @@ public interface BusPublisher { builder.authenticatedBy(apiKey, apiSecret); } + if (username != null && !username.isEmpty() && + password != null && !password.isEmpty()) { + builder.authenticatedByHttp(username, password); + } + try { this.publisher = builder.build(); } catch (MalformedURLException | GeneralSecurityException e) { diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java index 912607fd..718bb21d 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -114,10 +114,11 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop (this.longitude == null || this.longitude.isEmpty()) && (this.partner == null || this.partner.isEmpty())) { this.publisher = - new BusPublisher.DmaapAafPublisherWrapper(this.servers, + new BusPublisher.CambriaPublisherWrapper(this.servers, this.topic, - this.userName, - this.password, this.useHttps); + this.apiKey, this.apiSecret, + this.userName, this.password, + this.useHttps); } else { this.publisher = new BusPublisher.DmaapDmePublisherWrapper(this.servers, this.topic, @@ -148,4 +149,4 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop return builder.toString(); } -}
\ No newline at end of file +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java index 6c1bc8a0..88d67fd2 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -162,11 +162,12 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource (this.longitude == null || this.longitude.isEmpty()) && (this.partner == null || this.partner.isEmpty())) { this.consumer = - new BusConsumer.DmaapAafConsumerWrapper(this.servers, this.topic, + new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance, - this.fetchTimeout, this.fetchLimit, this.useHttps); + this.fetchTimeout, this.fetchLimit, + this.useHttps, this.allowSelfSignedCerts); } else { this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic, |