diff options
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks')
6 files changed, 58 insertions, 42 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java index 8e31807a..705b085b 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java @@ -25,9 +25,11 @@ import org.onap.dcaegen2.services.prh.exceptions.AaiNotFoundException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiReactiveWebClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiReactiveHttpPatchClient; + +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiHttpClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiHttpPatchClient; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.ssl.SslFactory; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; @@ -40,14 +42,14 @@ public abstract class AaiProducerTask { abstract Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel message) throws AaiNotFoundException; - abstract AaiReactiveHttpPatchClient resolveClient() throws SSLException; + abstract AaiHttpPatchClient resolveClient(); protected abstract AaiClientConfiguration resolveConfiguration(); protected abstract Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException, SSLException; - WebClient buildWebClient() throws SSLException { - return new AaiReactiveWebClientFactory(new SslFactory(), resolveConfiguration()).build(); + CloudHttpClient buildHttpClient() { + return new AaiHttpClientFactory(resolveConfiguration()).build(); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java index 355ca905..a34549d8 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java @@ -20,22 +20,22 @@ package org.onap.dcaegen2.services.prh.tasks; -import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.exceptions.AaiNotFoundException; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; -import org.onap.dcaegen2.services.prh.model.AaiJsonBodyBuilderImpl; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; +import org.onap.dcaegen2.services.prh.model.AaiJsonBodyBuilderImpl; import org.onap.dcaegen2.services.prh.model.utils.HttpUtils; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiReactiveHttpPatchClient; - +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiHttpClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiHttpPatchClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClientResponse; /** @@ -47,7 +47,7 @@ public class AaiProducerTaskImpl extends AaiProducerTask { private static final Logger LOGGER = LoggerFactory.getLogger(AaiProducerTaskImpl.class); private final Config config; - private AaiReactiveHttpPatchClient aaiReactiveHttpPatchClient; + private AaiHttpPatchClient aaiHttpPatchClient; @Autowired public AaiProducerTaskImpl(Config config) { @@ -56,20 +56,20 @@ public class AaiProducerTaskImpl extends AaiProducerTask { @Override Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel consumerDmaapModel) { - LOGGER.info("Publish to AAI DmaapModel"); - return aaiReactiveHttpPatchClient.getAaiProducerResponse(consumerDmaapModel) - .flatMap(response -> { - if (HttpUtils.isSuccessfulResponseCode(response.statusCode().value())) { - return Mono.just(consumerDmaapModel); - } - return Mono - .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow")); - }); + Mono<HttpClientResponse> resposne = aaiHttpPatchClient.getAaiResponse(consumerDmaapModel); + return resposne.flatMap(response -> { + if (HttpUtils.isSuccessfulResponseCode(response.status().code())) { + return Mono.just(consumerDmaapModel); + } + return Mono + .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow" + response.status().code())); + }); } @Override - AaiReactiveHttpPatchClient resolveClient() throws SSLException { - return new AaiReactiveHttpPatchClient(resolveConfiguration(), new AaiJsonBodyBuilderImpl()).createAaiWebClient(buildWebClient()); + AaiHttpPatchClient resolveClient() { + return new AaiHttpPatchClient(resolveConfiguration(), + new AaiJsonBodyBuilderImpl()).createAaiHttpClient(new AaiHttpClientFactory(resolveConfiguration()).build()); } @Override @@ -78,12 +78,11 @@ public class AaiProducerTaskImpl extends AaiProducerTask { } @Override - protected Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) - throws PrhTaskException, SSLException { + protected Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException { if (consumerDmaapModel == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } - aaiReactiveHttpPatchClient = resolveClient(); + aaiHttpPatchClient = resolveClient(); LOGGER.debug("Method called with arg {}", consumerDmaapModel); return publish(consumerDmaapModel); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index fd7bca1e..69246090 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -20,6 +20,9 @@ package org.onap.dcaegen2.services.prh.tasks; +import com.google.gson.JsonArray; +import java.util.Objects; +import java.util.Optional; import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; @@ -33,6 +36,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** @@ -69,7 +73,10 @@ public class DmaapConsumerTaskImpl implements DmaapConsumerTask { public Flux<ConsumerDmaapModel> execute(String object) throws SSLException { DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient(); LOGGER.debug("Method called with arg {}", object); - return dmaapConsumerJsonParser.getJsonObject(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()); + + Mono<JsonArray> response = dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse( + Optional.empty()); + return dmaapConsumerJsonParser.getJsonObject(response); } @Override diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java index 4d6c0f87..ec8ffaff 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java @@ -20,11 +20,11 @@ package org.onap.dcaegen2.services.prh.tasks; +import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; - -import org.springframework.http.ResponseEntity; +import reactor.netty.http.client.HttpClientResponse; import reactor.core.publisher.Mono; /** @@ -32,7 +32,7 @@ import reactor.core.publisher.Mono; */ interface DmaapPublisherTask { - Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; + Mono<HttpClientResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException, SSLException; - DMaaPPublisherReactiveHttpClient resolveClient(); + DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException;; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java index 49accdd4..85b18b8a 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java @@ -20,11 +20,14 @@ package org.onap.dcaegen2.services.prh.tasks; +import java.util.Optional; +import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.PnfReadyJsonBodyBuilderImpl; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory; @@ -32,8 +35,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.produce import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; +import reactor.netty.http.client.HttpClientResponse; import reactor.core.publisher.Mono; /** @@ -43,7 +46,8 @@ import reactor.core.publisher.Mono; public class DmaapPublisherTaskImpl implements DmaapPublisherTask { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); - private final Config config; + private DmaapPublisherConfiguration dmaapPublisherConfiguration; + private final PublisherReactiveHttpClientFactory httpClientFactory; @Autowired @@ -52,22 +56,23 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask { } DmaapPublisherTaskImpl(Config config, PublisherReactiveHttpClientFactory httpClientFactory) { - this.config = config; + this.dmaapPublisherConfiguration = config.getDmaapPublisherConfiguration(); this.httpClientFactory = httpClientFactory; } @Override - public Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException { + public Mono<HttpClientResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException,SSLException { if (consumerDmaapModel == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient = resolveClient(); LOGGER.info("Method called with arg {}", consumerDmaapModel); - return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel); + return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel,Optional.empty()); } @Override - public DMaaPPublisherReactiveHttpClient resolveClient() { - return httpClientFactory.create(config.getDmaapPublisherConfiguration()); + public DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException{ + return httpClientFactory.create(dmaapPublisherConfiguration); + } }
\ No newline at end of file diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index 2924225b..7ecf4a6e 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -39,10 +39,10 @@ import org.slf4j.MDC; import org.slf4j.Marker; import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClientResponse; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -108,10 +108,13 @@ public class ScheduledTasks { logger.info("PRH tasks have been completed"); } - private void onSuccess(ResponseEntity<String> responseCode) { - MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString()); + private void onSuccess(HttpClientResponse response) { + + String statusCode = Integer.toString(response.status().code()); + + MDC.put(RESPONSE_CODE, statusCode); logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", - responseCode.getStatusCode().value()); + statusCode); MDC.remove(RESPONSE_CODE); } @@ -148,10 +151,10 @@ public class ScheduledTasks { } } - private Mono<ResponseEntity<String>> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) { + private Mono<HttpClientResponse> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) { try { return dmaapProducerTask.execute(monoAaiModel); - } catch (PrhTaskException e) { + } catch (PrhTaskException | SSLException e) { return Mono.error(e); } } |