summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2018-04-13 16:34:35 -0400
committerJim Hahn <jrh3@att.com>2018-04-16 08:50:18 -0400
commit4390de5f7da75b09afdaa368a1ddcea2b1f0c43d (patch)
treeebe3f5c7b344f24e352df72dc426d3fd37945c3a
parent3fd9dc0e5c584702d25982172bb5ee44b6b57aa3 (diff)
Switch to cambria client 1.2.1-oss
Switched to new version of cambria client. Added code to set cambria socket timeout. Modified code to use cambria client instead of dmaap client. Removed extra, unnecessary items that had been added to the top-level pom.xml Change-Id: If71d36f50da5423ec0cf21b30e66aff5b1c9222a Issue-ID: POLICY-742 Signed-off-by: Jim Hahn <jrh3@att.com>
-rw-r--r--policy-endpoints/pom.xml6
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java32
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java30
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java11
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java7
-rw-r--r--policy-management/pom.xml11
-rw-r--r--pom.xml2
7 files changed, 64 insertions, 35 deletions
diff --git a/policy-endpoints/pom.xml b/policy-endpoints/pom.xml
index 9f8cabc9..8664646c 100644
--- a/policy-endpoints/pom.xml
+++ b/policy-endpoints/pom.xml
@@ -2,7 +2,7 @@
============LICENSE_START=======================================================
ONAP Policy Engine - Drools PDP
================================================================================
- Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
================================================================================
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -51,10 +51,6 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
- <exclusion>
- <groupId>com.att.nsa</groupId>
- <artifactId>saClientLibrary</artifactId>
- </exclusion>
</exclusions>
</dependency>
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
index db240b3d..70c37d55 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
@@ -122,28 +122,40 @@ public interface BusConsumer {
public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey,
String apiSecret, String consumerGroup, String consumerInstance, int fetchTimeout,
int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) {
+ this(servers, topic, apiKey, apiSecret, null, null,
+ consumerGroup, consumerInstance, fetchTimeout, fetchLimit,
+ useHttps, useSelfSignedCerts);
+ }
+
+ public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey,
+ String apiSecret, String username, String password,
+ String consumerGroup, String consumerInstance, int fetchTimeout,
+ int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) {
this.fetchTimeout = fetchTimeout;
this.builder = new CambriaClientBuilders.ConsumerBuilder();
+ builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
+ .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit);
+
+ // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
+ builder.withSocketTimeout(fetchTimeout + 30000);
+
if (useHttps) {
+ builder.usingHttps();
if (useSelfSignedCerts) {
- builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
- .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps()
- .allowSelfSignedCertificates();
- } else {
- builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
- .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps();
+ builder.allowSelfSignedCertificates();
}
- } else {
- builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
- .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit);
}
if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) {
- builder.authenticatedBy(apiKey, apiSecret);
+ builder.authenticatedBy(apiKey, apiSecret);
+ }
+
+ if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) {
+ builder.authenticatedByHttp(username, password);
}
try {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java
index 852c9c16..8e18bba8 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java
@@ -74,18 +74,23 @@ public interface BusPublisher {
public CambriaPublisherWrapper(List<String> servers, String topic,
String apiKey,
String apiSecret, boolean useHttps) {
+ this(servers, topic, apiKey, apiSecret, null, null, useHttps);
+ }
+
+ public CambriaPublisherWrapper(List<String> servers, String topic,
+ String apiKey, String apiSecret,
+ String username, String password,
+ boolean useHttps) {
+
PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
-
- if (useHttps){
-
- builder.usingHosts(servers)
- .onTopic(topic)
- .usingHttps();
- }
- else{
- builder.usingHosts(servers)
- .onTopic(topic);
+ builder.usingHosts(servers).onTopic(topic);
+
+ // Set read timeout to 30 seconds (TBD: this should be configurable)
+ builder.withSocketTimeout(30000);
+
+ if (useHttps){
+ builder.usingHttps();
}
@@ -94,6 +99,11 @@ public interface BusPublisher {
builder.authenticatedBy(apiKey, apiSecret);
}
+ if (username != null && !username.isEmpty() &&
+ password != null && !password.isEmpty()) {
+ builder.authenticatedByHttp(username, password);
+ }
+
try {
this.publisher = builder.build();
} catch (MalformedURLException | GeneralSecurityException e) {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
index 912607fd..718bb21d 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -114,10 +114,11 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
(this.longitude == null || this.longitude.isEmpty()) &&
(this.partner == null || this.partner.isEmpty())) {
this.publisher =
- new BusPublisher.DmaapAafPublisherWrapper(this.servers,
+ new BusPublisher.CambriaPublisherWrapper(this.servers,
this.topic,
- this.userName,
- this.password, this.useHttps);
+ this.apiKey, this.apiSecret,
+ this.userName, this.password,
+ this.useHttps);
} else {
this.publisher =
new BusPublisher.DmaapDmePublisherWrapper(this.servers, this.topic,
@@ -148,4 +149,4 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
return builder.toString();
}
-} \ No newline at end of file
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
index 6c1bc8a0..88d67fd2 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -162,11 +162,12 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
(this.longitude == null || this.longitude.isEmpty()) &&
(this.partner == null || this.partner.isEmpty())) {
this.consumer =
- new BusConsumer.DmaapAafConsumerWrapper(this.servers, this.topic,
+ new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic,
this.apiKey, this.apiSecret,
this.userName, this.password,
this.consumerGroup, this.consumerInstance,
- this.fetchTimeout, this.fetchLimit, this.useHttps);
+ this.fetchTimeout, this.fetchLimit,
+ this.useHttps, this.allowSelfSignedCerts);
} else {
this.consumer =
new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic,
diff --git a/policy-management/pom.xml b/policy-management/pom.xml
index 2c0f968e..5386e152 100644
--- a/policy-management/pom.xml
+++ b/policy-management/pom.xml
@@ -3,7 +3,7 @@
============LICENSE_START=======================================================
ONAP Policy Engine - Drools PDP
================================================================================
- Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
================================================================================
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -235,6 +235,15 @@
<version>4.1</version>
</dependency>
+ <!-- if we don't explicitly specify the version here, we seem to end up
+ with version 1.4 (as a dependency to drools-core). This version is
+ not compatible with 'saClientLibrary' version 1.2.1-oss -->
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.9</version>
+ </dependency>
+
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
diff --git a/pom.xml b/pom.xml
index a8d8fab8..f32faba4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,7 +58,7 @@
<!-- Project common dependency versions -->
<dmaap.version>1.1.3</dmaap.version>
- <cambria.version>0.0.1</cambria.version>
+ <cambria.version>1.2.1-oss</cambria.version>
<jersey.version>2.25.1</jersey.version>
<jersey.swagger.version>1.5.18</jersey.swagger.version>
<jackson.version>2.9.5</jackson.version>