summaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java
diff options
context:
space:
mode:
authorpkaras <piotr.karas@nokia.com>2018-11-05 16:04:54 +0100
committerpkaras <piotr.karas@nokia.com>2018-11-05 16:10:17 +0100
commite8a80102a45458b3f1d15e07dc0a63e1370c44a7 (patch)
tree374b4fc2d3b7577da8612ab92998f0e25090f147 /prh-app-server/src/main/java
parent600c05b1530c120b34370e86e92dfd79421474fe (diff)
DmaaP Publisher and consumer interfaces ssl setup
Change-Id: I971d0fb222c6e8e15de4fc4a4d9eeb4ef3a99f44 Issue-ID: DCAEGEN2-944 Signed-off-by: piotr.karas <piotr.karas@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java3
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java26
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java2
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java19
4 files changed, 20 insertions, 30 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
index 2f947d47..58b29d2e 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
@@ -23,7 +23,6 @@ package org.onap.dcaegen2.services.prh.tasks;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
@@ -34,7 +33,5 @@ interface DmaapConsumerTask {
Flux<ConsumerDmaapModel> execute(String object);
- Flux<ConsumerDmaapModel> consume(Mono<String> message);
-
DMaaPConsumerReactiveHttpClient resolveClient();
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index c4d9c44a..a52163b7 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -22,15 +22,15 @@ package org.onap.dcaegen2.services.prh.tasks;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
+import org.onap.dcaegen2.services.prh.service.consumer.ConsumerReactiveHttpClientFactory;
import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
+import org.onap.dcaegen2.services.prh.service.consumer.DMaaPReactiveWebClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -41,18 +41,20 @@ public class DmaapConsumerTaskImpl implements DmaapConsumerTask {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
private final Config config;
private final DmaapConsumerJsonParser dmaapConsumerJsonParser;
- private final DMaaPReactiveWebClient dmaapReactiveWebClient;
+ private final ConsumerReactiveHttpClientFactory httpClientFactory;
@Autowired
public DmaapConsumerTaskImpl(Config config) {
- this(config, new DmaapConsumerJsonParser(), new DMaaPReactiveWebClient());
+ this(config, new DmaapConsumerJsonParser(),
+ new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClient()));
}
- DmaapConsumerTaskImpl(Config prhAppConfig, DmaapConsumerJsonParser dmaapConsumerJsonParser,
- DMaaPReactiveWebClient dmaapReactiveWebClient) {
+ DmaapConsumerTaskImpl(Config prhAppConfig,
+ DmaapConsumerJsonParser dmaapConsumerJsonParser,
+ ConsumerReactiveHttpClientFactory httpClientFactory) {
this.config = prhAppConfig;
this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
- this.dmaapReactiveWebClient = dmaapReactiveWebClient;
+ this.httpClientFactory = httpClientFactory;
}
@Override
@@ -64,17 +66,11 @@ public class DmaapConsumerTaskImpl implements DmaapConsumerTask {
public Flux<ConsumerDmaapModel> execute(String object) {
DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient();
LOGGER.debug("Method called with arg {}", object);
- return consume(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse());
- }
-
- @Override
- public Flux<ConsumerDmaapModel> consume(Mono<String> message) {
- return dmaapConsumerJsonParser.getJsonObject(message);
+ return dmaapConsumerJsonParser.getJsonObject(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse());
}
@Override
public DMaaPConsumerReactiveHttpClient resolveClient() {
- return new DMaaPConsumerReactiveHttpClient(
- config.getDmaapConsumerConfiguration()).createDMaaPWebClient(dmaapReactiveWebClient.build());
+ return httpClientFactory.create(config.getDmaapConsumerConfiguration());
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
index dc0a4488..3f59815b 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
@@ -33,7 +33,5 @@ interface DmaapPublisherTask {
Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
- Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
-
DMaaPPublisherReactiveHttpClient resolveClient();
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
index fdc5e625..76c1bb56 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
@@ -24,12 +24,12 @@ import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.producer.DMaaPPublisherReactiveHttpClient;
+import org.onap.dcaegen2.services.prh.service.producer.PublisherReactiveHttpClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
-import org.springframework.web.client.RestTemplate;
import reactor.core.publisher.Mono;
/**
@@ -40,11 +40,16 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
private final Config config;
- private DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient;
+ private final PublisherReactiveHttpClientFactory httpClientFactory;
@Autowired
public DmaapPublisherTaskImpl(Config config) {
+ this(config, new PublisherReactiveHttpClientFactory());
+ }
+
+ DmaapPublisherTaskImpl(Config config, PublisherReactiveHttpClientFactory httpClientFactory) {
this.config = config;
+ this.httpClientFactory = httpClientFactory;
}
@Override
@@ -52,19 +57,13 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask {
if (consumerDmaapModel == null) {
throw new DmaapNotFoundException("Invoked null object to DMaaP task");
}
- dmaapPublisherReactiveHttpClient = resolveClient();
+ DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient = resolveClient();
LOGGER.info("Method called with arg {}", consumerDmaapModel);
- return publish(consumerDmaapModel);
- }
-
- @Override
- public Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) {
return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel);
}
@Override
public DMaaPPublisherReactiveHttpClient resolveClient() {
- return new DMaaPPublisherReactiveHttpClient(config.getDmaapPublisherConfiguration())
- .createDMaaPWebClient(new RestTemplate());
+ return httpClientFactory.create(config.getDmaapPublisherConfiguration());
}
} \ No newline at end of file