diff options
7 files changed, 64 insertions, 35 deletions
diff --git a/policy-endpoints/pom.xml b/policy-endpoints/pom.xml index e839b169..f43b4c93 100644 --- a/policy-endpoints/pom.xml +++ b/policy-endpoints/pom.xml @@ -2,7 +2,7 @@ ============LICENSE_START======================================================= ONAP Policy Engine - Drools PDP ================================================================================ - Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. ================================================================================ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -51,10 +51,6 @@ <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> - <exclusion> - <groupId>com.att.nsa</groupId> - <artifactId>saClientLibrary</artifactId> - </exclusion> </exclusions> </dependency> diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java index db240b3d..70c37d55 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java @@ -122,28 +122,40 @@ public interface BusConsumer { public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) { + this(servers, topic, apiKey, apiSecret, null, null, + consumerGroup, consumerInstance, fetchTimeout, fetchLimit, + useHttps, useSelfSignedCerts); + } + + public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, + String apiSecret, String username, String password, + String consumerGroup, String consumerInstance, int fetchTimeout, + int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) { this.fetchTimeout = fetchTimeout; this.builder = new CambriaClientBuilders.ConsumerBuilder(); + builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) + .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit); + + // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable) + builder.withSocketTimeout(fetchTimeout + 30000); + if (useHttps) { + builder.usingHttps(); if (useSelfSignedCerts) { - builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) - .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps() - .allowSelfSignedCertificates(); - } else { - builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) - .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps(); + builder.allowSelfSignedCertificates(); } - } else { - builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) - .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit); } if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { - builder.authenticatedBy(apiKey, apiSecret); + builder.authenticatedBy(apiKey, apiSecret); + } + + if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) { + builder.authenticatedByHttp(username, password); } try { diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java index 852c9c16..8e18bba8 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java @@ -74,18 +74,23 @@ public interface BusPublisher { public CambriaPublisherWrapper(List<String> servers, String topic, String apiKey, String apiSecret, boolean useHttps) { + this(servers, topic, apiKey, apiSecret, null, null, useHttps); + } + + public CambriaPublisherWrapper(List<String> servers, String topic, + String apiKey, String apiSecret, + String username, String password, + boolean useHttps) { + PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder(); - - if (useHttps){ - - builder.usingHosts(servers) - .onTopic(topic) - .usingHttps(); - } - else{ - builder.usingHosts(servers) - .onTopic(topic); + builder.usingHosts(servers).onTopic(topic); + + // Set read timeout to 30 seconds (TBD: this should be configurable) + builder.withSocketTimeout(30000); + + if (useHttps){ + builder.usingHttps(); } @@ -94,6 +99,11 @@ public interface BusPublisher { builder.authenticatedBy(apiKey, apiSecret); } + if (username != null && !username.isEmpty() && + password != null && !password.isEmpty()) { + builder.authenticatedByHttp(username, password); + } + try { this.publisher = builder.build(); } catch (MalformedURLException | GeneralSecurityException e) { diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java index 912607fd..718bb21d 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -114,10 +114,11 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop (this.longitude == null || this.longitude.isEmpty()) && (this.partner == null || this.partner.isEmpty())) { this.publisher = - new BusPublisher.DmaapAafPublisherWrapper(this.servers, + new BusPublisher.CambriaPublisherWrapper(this.servers, this.topic, - this.userName, - this.password, this.useHttps); + this.apiKey, this.apiSecret, + this.userName, this.password, + this.useHttps); } else { this.publisher = new BusPublisher.DmaapDmePublisherWrapper(this.servers, this.topic, @@ -148,4 +149,4 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop return builder.toString(); } -}
\ No newline at end of file +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java index 6c1bc8a0..88d67fd2 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -162,11 +162,12 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource (this.longitude == null || this.longitude.isEmpty()) && (this.partner == null || this.partner.isEmpty())) { this.consumer = - new BusConsumer.DmaapAafConsumerWrapper(this.servers, this.topic, + new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance, - this.fetchTimeout, this.fetchLimit, this.useHttps); + this.fetchTimeout, this.fetchLimit, + this.useHttps, this.allowSelfSignedCerts); } else { this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic, diff --git a/policy-management/pom.xml b/policy-management/pom.xml index 75568122..56037f36 100644 --- a/policy-management/pom.xml +++ b/policy-management/pom.xml @@ -3,7 +3,7 @@ ============LICENSE_START======================================================= ONAP Policy Engine - Drools PDP ================================================================================ - Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. ================================================================================ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -235,6 +235,15 @@ <version>4.1</version> </dependency> + <!-- if we don't explicitly specify the version here, we seem to end up + with version 1.4 (as a dependency to drools-core). This version is + not compatible with 'saClientLibrary' version 1.2.1-oss --> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <version>1.9</version> + </dependency> + <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> @@ -58,7 +58,7 @@ <!-- Project common dependency versions --> <dmaap.version>1.1.3</dmaap.version> - <cambria.version>0.0.1</cambria.version> + <cambria.version>1.2.1-oss</cambria.version> <jersey.version>2.25.1</jersey.version> <jersey.swagger.version>1.5.18</jersey.swagger.version> <jackson.version>2.9.5</jackson.version> |