summaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2018-04-13 16:34:35 -0400
committerJim Hahn <jrh3@att.com>2018-04-16 08:50:18 -0400
commit4390de5f7da75b09afdaa368a1ddcea2b1f0c43d (patch)
treeebe3f5c7b344f24e352df72dc426d3fd37945c3a /policy-endpoints/src/main/java/org/onap
parent3fd9dc0e5c584702d25982172bb5ee44b6b57aa3 (diff)
Switch to cambria client 1.2.1-oss
Switched to new version of cambria client. Added code to set cambria socket timeout. Modified code to use cambria client instead of dmaap client. Removed extra, unnecessary items that had been added to the top-level pom.xml Change-Id: If71d36f50da5423ec0cf21b30e66aff5b1c9222a Issue-ID: POLICY-742 Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java32
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java30
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java11
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java7
4 files changed, 52 insertions, 28 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
index db240b3d..70c37d55 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
@@ -122,28 +122,40 @@ public interface BusConsumer {
public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey,
String apiSecret, String consumerGroup, String consumerInstance, int fetchTimeout,
int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) {
+ this(servers, topic, apiKey, apiSecret, null, null,
+ consumerGroup, consumerInstance, fetchTimeout, fetchLimit,
+ useHttps, useSelfSignedCerts);
+ }
+
+ public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey,
+ String apiSecret, String username, String password,
+ String consumerGroup, String consumerInstance, int fetchTimeout,
+ int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) {
this.fetchTimeout = fetchTimeout;
this.builder = new CambriaClientBuilders.ConsumerBuilder();
+ builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
+ .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit);
+
+ // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
+ builder.withSocketTimeout(fetchTimeout + 30000);
+
if (useHttps) {
+ builder.usingHttps();
if (useSelfSignedCerts) {
- builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
- .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps()
- .allowSelfSignedCertificates();
- } else {
- builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
- .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps();
+ builder.allowSelfSignedCertificates();
}
- } else {
- builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
- .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit);
}
if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) {
- builder.authenticatedBy(apiKey, apiSecret);
+ builder.authenticatedBy(apiKey, apiSecret);
+ }
+
+ if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) {
+ builder.authenticatedByHttp(username, password);
}
try {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java
index 852c9c16..8e18bba8 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java
@@ -74,18 +74,23 @@ public interface BusPublisher {
public CambriaPublisherWrapper(List<String> servers, String topic,
String apiKey,
String apiSecret, boolean useHttps) {
+ this(servers, topic, apiKey, apiSecret, null, null, useHttps);
+ }
+
+ public CambriaPublisherWrapper(List<String> servers, String topic,
+ String apiKey, String apiSecret,
+ String username, String password,
+ boolean useHttps) {
+
PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
-
- if (useHttps){
-
- builder.usingHosts(servers)
- .onTopic(topic)
- .usingHttps();
- }
- else{
- builder.usingHosts(servers)
- .onTopic(topic);
+ builder.usingHosts(servers).onTopic(topic);
+
+ // Set read timeout to 30 seconds (TBD: this should be configurable)
+ builder.withSocketTimeout(30000);
+
+ if (useHttps){
+ builder.usingHttps();
}
@@ -94,6 +99,11 @@ public interface BusPublisher {
builder.authenticatedBy(apiKey, apiSecret);
}
+ if (username != null && !username.isEmpty() &&
+ password != null && !password.isEmpty()) {
+ builder.authenticatedByHttp(username, password);
+ }
+
try {
this.publisher = builder.build();
} catch (MalformedURLException | GeneralSecurityException e) {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
index 912607fd..718bb21d 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -114,10 +114,11 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
(this.longitude == null || this.longitude.isEmpty()) &&
(this.partner == null || this.partner.isEmpty())) {
this.publisher =
- new BusPublisher.DmaapAafPublisherWrapper(this.servers,
+ new BusPublisher.CambriaPublisherWrapper(this.servers,
this.topic,
- this.userName,
- this.password, this.useHttps);
+ this.apiKey, this.apiSecret,
+ this.userName, this.password,
+ this.useHttps);
} else {
this.publisher =
new BusPublisher.DmaapDmePublisherWrapper(this.servers, this.topic,
@@ -148,4 +149,4 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
return builder.toString();
}
-} \ No newline at end of file
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
index 6c1bc8a0..88d67fd2 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -162,11 +162,12 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
(this.longitude == null || this.longitude.isEmpty()) &&
(this.partner == null || this.partner.isEmpty())) {
this.consumer =
- new BusConsumer.DmaapAafConsumerWrapper(this.servers, this.topic,
+ new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic,
this.apiKey, this.apiSecret,
this.userName, this.password,
this.consumerGroup, this.consumerInstance,
- this.fetchTimeout, this.fetchLimit, this.useHttps);
+ this.fetchTimeout, this.fetchLimit,
+ this.useHttps, this.allowSelfSignedCerts);
} else {
this.consumer =
new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic,