aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwasala <przemyslaw.wasala@nokia.com>2018-08-08 13:39:14 +0200
committerwasala <przemyslaw.wasala@nokia.com>2018-08-09 07:55:43 +0200
commit5005d7463fb8ef25f0b4e975d4392367037c7239 (patch)
treee1ef2c5f5bc99d9514c13e1068f3b56c1881a4c1
parent7679a6f8177d47116aa8dbae0b38f8b0a8174dc5 (diff)
Reactive A&AI client
*plugged reactiveHttpClient in prh workflow *added junit tests for workflow Change-Id: I74f3fa7354de9b0f7f164c070ea61b70e74bde23 Issue-ID: DCAEGEN2-609 Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
-rw-r--r--prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiProducerClient.java145
-rw-r--r--prh-aai-client/src/test/java/org/onap/dcaegen2/services/prh/service/AaiProducerClientTest.java166
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java19
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java47
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java6
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java12
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImplTest.java66
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java6
-rw-r--r--prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java2
9 files changed, 74 insertions, 395 deletions
diff --git a/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiProducerClient.java b/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiProducerClient.java
deleted file mode 100644
index b5368c6d..00000000
--- a/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiProducerClient.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * PNF-REGISTRATION-HANDLER
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.services.prh.service;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Base64;
-import java.util.Map;
-import java.util.Optional;
-
-import java.util.function.Predicate;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.methods.HttpPatch;
-import org.apache.http.client.methods.HttpRequestBase;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.util.EntityUtils;
-import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
-import org.onap.dcaegen2.services.prh.model.CommonFunctions;
-import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.model.utils.HttpUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AaiProducerClient implements AaiExtendedHttpClient {
-
- private static final String EXCEPTION_MESSAGE = "Exception while executing http client: ";
- private static Predicate<String> isEmpty = String::isEmpty;
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final CloseableHttpClient closeableHttpClient;
- private final String aaiHost;
- private final String aaiProtocol;
- private final Integer aaiHostPortNumber;
- private final String aaiPath;
- private final Map<String, String> aaiHeaders;
- private final String aaiUserName;
- private final String aaiUserPassword;
-
- /**
- * A{@literal &}AI client for publishing data to A{@literal &}AI.
- *
- * @param aaiClientConfiguration - confiuration for A{@literal &}AI
- */
- public AaiProducerClient(AaiClientConfiguration aaiClientConfiguration) {
- closeableHttpClient = new AaiClientImpl(aaiClientConfiguration).getAaiHttpClient();
- aaiHost = aaiClientConfiguration.aaiHost();
- aaiProtocol = aaiClientConfiguration.aaiProtocol();
- aaiHostPortNumber = aaiClientConfiguration.aaiPort();
- aaiPath = aaiClientConfiguration.aaiBasePath() + aaiClientConfiguration.aaiPnfPath();
- aaiHeaders = aaiClientConfiguration.aaiHeaders();
- aaiUserName = aaiClientConfiguration.aaiUserName();
- aaiUserPassword = aaiClientConfiguration.aaiUserPassword();
- }
-
-
- @Override
- public Optional<Integer> getHttpResponse(ConsumerDmaapModel consumerDmaapModel) throws URISyntaxException {
- return createRequest(consumerDmaapModel).flatMap(httpRequestBase -> {
- try {
- return closeableHttpClient.execute(httpRequestBase, this::handleResponse);
- } catch (IOException e) {
- logger.warn(EXCEPTION_MESSAGE, e);
- return Optional.empty();
- }
- });
- }
-
- private Optional<HttpRequestBase> createRequest(ConsumerDmaapModel consumerDmaapModel) throws URISyntaxException {
- final URI extendedUri = createAaiExtendedUri(consumerDmaapModel.getPnfName());
- return createHttpRequest(extendedUri, consumerDmaapModel);
- }
-
- private URI createAaiExtendedUri(final String pnfName) throws URISyntaxException {
- return new URIBuilder()
- .setScheme(aaiProtocol)
- .setHost(aaiHost)
- .setPort(aaiHostPortNumber)
- .setPath(aaiPath + "/" + pnfName).build();
- }
-
- private Optional<HttpRequestBase> createHttpRequest(URI extendedUri, ConsumerDmaapModel consumerDmaapModel) {
- return Optional.ofNullable(CommonFunctions.createJsonBody(consumerDmaapModel)).filter(isEmpty.negate())
- .flatMap(myJson -> {
- try {
- logger.info("AAI: sending json {}", myJson);
- return Optional.of(createHttpPatch(extendedUri, myJson));
- } catch (UnsupportedEncodingException e) {
- logger.warn(EXCEPTION_MESSAGE, e);
- }
- return Optional.empty();
- });
- }
-
- HttpPatch createHttpPatch(URI extendedUri, String jsonBody) throws UnsupportedEncodingException {
- HttpPatch httpPatch = new HttpPatch(extendedUri);
- httpPatch.setEntity(new StringEntity(jsonBody));
- aaiHeaders.forEach(httpPatch::addHeader);
- httpPatch.addHeader("Content-Type", "application/merge-patch+json");
- httpPatch.addHeader("Authorization", "Basic " + encode());
- return httpPatch;
- }
-
- String encode() throws UnsupportedEncodingException {
- return Base64.getEncoder().encodeToString((this.aaiUserName + ":" + this.aaiUserPassword)
- .getBytes("UTF-8"));
- }
-
- Optional<Integer> handleResponse(HttpResponse response) throws IOException {
-
- final Integer responseCode = response.getStatusLine().getStatusCode();
- logger.info("Status code of operation: {}", responseCode);
- final HttpEntity responseEntity = response.getEntity();
-
- if (HttpUtils.isSuccessfulResponseCode(responseCode)) {
- logger.trace("HTTP response successful.");
- return Optional.of(responseCode);
- } else {
- String aaiResponse = responseEntity != null ? EntityUtils.toString(responseEntity) : "";
- logger.warn("HTTP response not successful : {}", aaiResponse);
- return Optional.of(responseCode);
- }
- }
-}
diff --git a/prh-aai-client/src/test/java/org/onap/dcaegen2/services/prh/service/AaiProducerClientTest.java b/prh-aai-client/src/test/java/org/onap/dcaegen2/services/prh/service/AaiProducerClientTest.java
deleted file mode 100644
index 13d4dcec..00000000
--- a/prh-aai-client/src/test/java/org/onap/dcaegen2/services/prh/service/AaiProducerClientTest.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * PNF-REGISTRATION-HANDLER
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.services.prh.service;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Field;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpResponse;
-import org.apache.http.HttpStatus;
-import org.apache.http.StatusLine;
-import org.apache.http.client.ResponseHandler;
-import org.apache.http.client.methods.HttpPatch;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
-import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModelForUnitTest;
-
-class AaiProducerClientTest {
-
- private static final Integer SUCCESS = 200;
- private static final HttpResponse httpResponseMock = mock(HttpResponse.class);
- private static final HttpEntity httpEntityMock = mock(HttpEntity.class);
- private static final StatusLine statusLineMock = mock(StatusLine.class);
-
- private static AaiProducerClient testedObject;
- private static AaiClientConfiguration aaiHttpClientConfigurationMock = mock(AaiClientConfiguration.class);
- private static CloseableHttpClient closeableHttpClientMock = mock(CloseableHttpClient.class);
- private static ConsumerDmaapModel consumerDmaapModel = new ConsumerDmaapModelForUnitTest();
-
- @BeforeAll
- static void setup() throws NoSuchFieldException, IllegalAccessException {
- when(aaiHttpClientConfigurationMock.aaiHost()).thenReturn("eucalyptus.es-si-eu-dhn-20.eecloud.nsn-net.net");
- when(aaiHttpClientConfigurationMock.aaiProtocol()).thenReturn("https");
- when(aaiHttpClientConfigurationMock.aaiPort()).thenReturn(1234);
- when(aaiHttpClientConfigurationMock.aaiUserName()).thenReturn("PRH");
- when(aaiHttpClientConfigurationMock.aaiUserPassword()).thenReturn("PRH");
- when(aaiHttpClientConfigurationMock.aaiBasePath()).thenReturn("/aai/v11");
- when(aaiHttpClientConfigurationMock.aaiPnfPath()).thenReturn("/network/pnfs/pnf");
- when(aaiHttpClientConfigurationMock.aaiHeaders()).thenReturn(setupHeaders());
-
- testedObject = new AaiProducerClient(aaiHttpClientConfigurationMock);
- setField();
- }
-
- @Test
- void getHttpResponse_shouldReturnSuccessStatusCode() throws IOException, URISyntaxException {
- // when
- when(closeableHttpClientMock.execute(any(HttpPatch.class), any(ResponseHandler.class)))
- .thenReturn(Optional.of(SUCCESS));
- Optional<Integer> actualResult = testedObject.getHttpResponse(consumerDmaapModel);
- // then
- assertEquals(SUCCESS, actualResult.get());
- }
-
- @Test
- void getHttpResponse_shouldHandleIoException() throws IOException, URISyntaxException {
- // when
- when(closeableHttpClientMock.execute(any(HttpPatch.class), any(ResponseHandler.class)))
- .thenThrow(new IOException("Error occur"));
-
- // then
- assertNotNull(testedObject.getHttpResponse(consumerDmaapModel));
- }
-
- @Test
- void createHttpRequest_shouldCatchUnsupportedEncodingException() throws URISyntaxException, IOException {
- // when
- when(closeableHttpClientMock.execute(any(HttpPatch.class), any(ResponseHandler.class)))
- .thenThrow(new UnsupportedEncodingException("A new Error"));
- // then
- assertNotNull(testedObject.getHttpResponse(consumerDmaapModel));
- }
-
- @Test
- void encode_shouldCreateEncodedString_whenUserAndPasswordAreSet() throws UnsupportedEncodingException {
- // given
- String expected = "UFJIOlBSSA==";
- // when
- String result = testedObject.encode();
- // then
- assertNotNull(result);
- assertEquals(expected, result);
- }
-
- @Test
- void createHttpPatch_shouldContainAuthorizationBasicValue() throws UnsupportedEncodingException {
- // given
- String expected = "Authorization: Basic UFJIOlBSSA==";
- // when
- HttpPatch patch = testedObject.createHttpPatch(URI.create("localhost"), "{}");
- // then
- assertNotNull(patch);
- assertEquals(expected, patch.getLastHeader("Authorization").toString());
- }
-
- @Test
- void handleResponse_shouldReturn200() throws IOException {
- // When
- when(httpResponseMock.getEntity()).thenReturn(httpEntityMock);
- when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
- when(httpResponseMock.getStatusLine().getStatusCode()).thenReturn(HttpStatus.SC_OK);
- // Then
- assertEquals(Optional.of(HttpStatus.SC_OK), testedObject.handleResponse(httpResponseMock));
- }
-
- @Test
- void handleResponse_shouldReturn300() throws IOException {
- // When
- when(httpResponseMock.getEntity()).thenReturn(httpEntityMock);
- when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
- when(httpResponseMock.getStatusLine().getStatusCode()).thenReturn(HttpStatus.SC_BAD_REQUEST);
- // Then
- assertEquals(Optional.of(HttpStatus.SC_BAD_REQUEST), testedObject.handleResponse(httpResponseMock));
- }
-
-
- private static void setField() throws NoSuchFieldException, IllegalAccessException {
- Field field = testedObject.getClass().getDeclaredField("closeableHttpClient");
- field.setAccessible(true);
- field.set(testedObject, closeableHttpClientMock);
- }
-
- private static Map<String, String> setupHeaders() {
- Map<String, String> aaiHeaders = new HashMap<>();
- aaiHeaders.put("X-FromAppId", "prh");
- aaiHeaders.put("X-TransactionId", "vv-temp");
- aaiHeaders.put("Accept", "application/json");
- aaiHeaders.put("Real-Time", "true");
- aaiHeaders.put("Content-Type", "application/merge-patch+json");
- return aaiHeaders;
-
- }
-}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java
index abd4fc45..f58fed61 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java
@@ -20,19 +20,30 @@
package org.onap.dcaegen2.services.prh.tasks;
+import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
import org.onap.dcaegen2.services.prh.exceptions.AaiNotFoundException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.service.AaiProducerClient;
+import org.onap.dcaegen2.services.prh.service.AaiReactiveWebClient;
+import org.onap.dcaegen2.services.prh.service.producer.AaiProducerReactiveHttpClient;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
public abstract class AaiProducerTask {
- abstract ConsumerDmaapModel publish(ConsumerDmaapModel message) throws AaiNotFoundException;
+ abstract Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> message) throws AaiNotFoundException;
- abstract AaiProducerClient resolveClient();
+ abstract AaiProducerReactiveHttpClient resolveClient();
- protected abstract ConsumerDmaapModel execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
+ protected abstract AaiClientConfiguration resolveConfiguration();
+
+ protected abstract Mono<ConsumerDmaapModel> execute(Mono<ConsumerDmaapModel> consumerDmaapModel)
+ throws PrhTaskException;
+
+ WebClient buildWebClient() {
+ return new AaiReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
+ }
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
index 124a5c63..eed65c64 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
@@ -20,19 +20,20 @@
package org.onap.dcaegen2.services.prh.tasks;
-import java.net.URISyntaxException;
-import java.util.Optional;
import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.AaiNotFoundException;
+import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
+import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.utils.HttpUtils;
-import org.onap.dcaegen2.services.prh.service.AaiProducerClient;
+import org.onap.dcaegen2.services.prh.service.producer.AaiProducerReactiveHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
@@ -44,7 +45,7 @@ public class AaiProducerTaskImpl extends
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Config prhAppConfig;
- private AaiProducerClient aaiProducerClient;
+ private AaiProducerReactiveHttpClient aaiProducerReactiveHttpClient;
@Autowired
public AaiProducerTaskImpl(AppConfig prhAppConfig) {
@@ -52,33 +53,37 @@ public class AaiProducerTaskImpl extends
}
@Override
- ConsumerDmaapModel publish(ConsumerDmaapModel consumerDmaapModel) throws AaiNotFoundException {
+ Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) {
logger.info("Sending PNF model to AAI {}", consumerDmaapModel);
- try {
- return aaiProducerClient.getHttpResponse(consumerDmaapModel)
- .filter(HttpUtils::isSuccessfulResponseCode).map(response -> consumerDmaapModel).orElseThrow(() ->
- new AaiNotFoundException("Incorrect response code for continuation of tasks workflow"));
- } catch (URISyntaxException e) {
- logger.warn("Patch request not successful", e);
- throw new AaiNotFoundException("Patch request not successful");
- }
+ return aaiProducerReactiveHttpClient.getAaiProducerResponse(consumerDmaapModel)
+ .flatMap(response -> {
+ if (HttpUtils.isSuccessfulResponseCode(response)) {
+ return consumerDmaapModel;
+ }
+ return Mono
+ .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow"));
+ });
}
@Override
- public ConsumerDmaapModel execute(ConsumerDmaapModel consumerDmaapModel) throws AaiNotFoundException {
- consumerDmaapModel = Optional.ofNullable(consumerDmaapModel)
- .orElseThrow(() -> new AaiNotFoundException("Invoked null object to AAI task"));
- logger.trace("Method called with arg {}", consumerDmaapModel);
- aaiProducerClient = resolveClient();
- return publish(consumerDmaapModel);
+ AaiProducerReactiveHttpClient resolveClient() {
+ return aaiProducerReactiveHttpClient == null ? new AaiProducerReactiveHttpClient(resolveConfiguration())
+ .createAaiWebClient(buildWebClient()) : aaiProducerReactiveHttpClient;
}
+ @Override
protected AaiClientConfiguration resolveConfiguration() {
return prhAppConfig.getAaiClientConfiguration();
}
@Override
- AaiProducerClient resolveClient() {
- return Optional.ofNullable(aaiProducerClient).orElseGet(() -> new AaiProducerClient(resolveConfiguration()));
+ protected Mono<ConsumerDmaapModel> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException {
+ if (consumerDmaapModel == null) {
+ throw new DmaapNotFoundException("Invoked null object to DMaaP task");
+ }
+ aaiProducerReactiveHttpClient = resolveClient();
+ logger.trace("Method called with arg {}", consumerDmaapModel);
+ return publish(consumerDmaapModel);
+
}
} \ No newline at end of file
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
index 4949faa7..1a641fd4 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
@@ -20,7 +20,6 @@
package org.onap.dcaegen2.services.prh.tasks;
-import java.util.Optional;
import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
@@ -57,8 +56,9 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
@Override
public Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DmaapNotFoundException {
- consumerDmaapModel = Optional.ofNullable(consumerDmaapModel)
- .orElseThrow(() -> new DmaapNotFoundException("Invoked null object to DMaaP task"));
+ if (consumerDmaapModel == null) {
+ throw new DmaapNotFoundException("Invoked null object to DMaaP task");
+ }
dmaapProducerReactiveHttpClient = resolveClient();
logger.trace("Method called with arg {}", consumerDmaapModel);
return publish(consumerDmaapModel);
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index 664eb33c..c021abe2 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -95,13 +95,11 @@ public class ScheduledTasks {
}
private Mono<ConsumerDmaapModel> publishToAaiConfiguration(Mono<ConsumerDmaapModel> monoDMaaPModel) {
- return monoDMaaPModel.flatMap(dmaapModel -> {
- try {
- return Mono.just(aaiProducerTask.execute(dmaapModel));
- } catch (PrhTaskException e) {
- return Mono.error(e);
- }
- });
+ try {
+ return aaiProducerTask.execute(monoDMaaPModel);
+ } catch (PrhTaskException e) {
+ return Mono.error(e);
+ }
}
private Mono<String> publishToDmaapConfiguration(Mono<ConsumerDmaapModel> monoAaiModel) {
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImplTest.java
index 90206122..e3d8fe2f 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImplTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImplTest.java
@@ -29,24 +29,19 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.Optional;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
import org.onap.dcaegen2.services.prh.config.ImmutableAaiClientConfiguration;
-
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
-import org.onap.dcaegen2.services.prh.exceptions.AaiNotFoundException;
-
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
-
-import org.onap.dcaegen2.services.prh.service.AaiProducerClient;
+import org.onap.dcaegen2.services.prh.service.producer.AaiProducerReactiveHttpClient;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/14/18
@@ -64,7 +59,7 @@ class AaiProducerTaskImplTest {
private static ConsumerDmaapModel consumerDmaapModel;
private static AaiProducerTaskImpl aaiProducerTask;
private static AaiClientConfiguration aaiClientConfiguration;
- private static AaiProducerClient aaiProducerClient;
+ private static AaiProducerReactiveHttpClient aaiProducerReactiveHttpClient;
private static AppConfig appConfig;
@BeforeAll
@@ -99,57 +94,38 @@ class AaiProducerTaskImplTest {
}
@Test
- void whenPassedObjectFits_ReturnsCorrectStatus() throws AaiNotFoundException, URISyntaxException {
+ void whenPassedObjectFits_ReturnsCorrectStatus() throws PrhTaskException {
//given/when
- getAaiProducerTask_whenMockingResponseObject(200, false);
- ConsumerDmaapModel response = aaiProducerTask.execute(consumerDmaapModel);
+ getAaiProducerTask_whenMockingResponseObject(200);
+ Mono<ConsumerDmaapModel> response = aaiProducerTask.execute(Mono.just(consumerDmaapModel));
//then
- verify(aaiProducerClient, times(1)).getHttpResponse(any(ConsumerDmaapModel.class));
- verifyNoMoreInteractions(aaiProducerClient);
- Assertions.assertEquals(consumerDmaapModel, response);
+ verify(aaiProducerReactiveHttpClient, times(1)).getAaiProducerResponse(any());
+ verifyNoMoreInteractions(aaiProducerReactiveHttpClient);
+ Assertions.assertEquals(consumerDmaapModel, response.block());
}
@Test
- void whenPassedObjectFits_butIncorrectResponseReturns() throws URISyntaxException {
- //given/when
- getAaiProducerTask_whenMockingResponseObject(400, false);
- Executable executableCode = () -> aaiProducerTask.execute(consumerDmaapModel);
- Assertions
- .assertThrows(PrhTaskException.class, executableCode, "Incorrect status code in response message");
- //then
- verify(aaiProducerClient, times(1)).getHttpResponse(any(ConsumerDmaapModel.class));
- verifyNoMoreInteractions(aaiProducerClient);
- }
-
- @Test
- void whenPassedObjectFits_butHttpClientThrowsIoExceptionHandleIt() throws URISyntaxException {
+ void whenPassedObjectFits_butIncorrectResponseReturns() throws PrhTaskException {
//given/when
- getAaiProducerTask_whenMockingResponseObject(0, true);
-
- Executable executableCode = () -> aaiProducerTask.execute(consumerDmaapModel);
- Assertions
- .assertThrows(PrhTaskException.class, executableCode, "");
+ getAaiProducerTask_whenMockingResponseObject(400);
+ StepVerifier.create(aaiProducerTask.execute(Mono.just(consumerDmaapModel))).expectSubscription()
+ .expectError(PrhTaskException.class).verify();
//then
- verify(aaiProducerClient, times(1)).getHttpResponse(any(ConsumerDmaapModel.class));
- verifyNoMoreInteractions(aaiProducerClient);
+ verify(aaiProducerReactiveHttpClient, times(1)).getAaiProducerResponse(any());
+ verifyNoMoreInteractions(aaiProducerReactiveHttpClient);
}
-
- private static void getAaiProducerTask_whenMockingResponseObject(int statusCode, boolean throwsException)
- throws URISyntaxException {
+ private static void getAaiProducerTask_whenMockingResponseObject(Integer statusCode) {
//given
- aaiProducerClient = mock(AaiProducerClient.class);
- if (throwsException) {
- when(aaiProducerClient.getHttpResponse(consumerDmaapModel)).thenThrow(URISyntaxException.class);
- } else {
- when(aaiProducerClient.getHttpResponse(consumerDmaapModel)).thenReturn(Optional.of(statusCode));
- }
+ aaiProducerReactiveHttpClient = mock(AaiProducerReactiveHttpClient.class);
+ when(aaiProducerReactiveHttpClient.getAaiProducerResponse(any()))
+ .thenReturn(Mono.just(statusCode));
when(appConfig.getAaiClientConfiguration()).thenReturn(aaiClientConfiguration);
aaiProducerTask = spy(new AaiProducerTaskImpl(appConfig));
when(aaiProducerTask.resolveConfiguration()).thenReturn(aaiClientConfiguration);
- doReturn(aaiProducerClient).when(aaiProducerTask).resolveClient();
+ doReturn(aaiProducerReactiveHttpClient).when(aaiProducerTask).resolveClient();
}
} \ No newline at end of file
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java
index 9b0292b8..82dcdae9 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java
@@ -26,7 +26,7 @@ import static org.mockito.Mockito.spy;
import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
-import org.onap.dcaegen2.services.prh.service.AaiProducerClient;
+import org.onap.dcaegen2.services.prh.service.producer.AaiProducerReactiveHttpClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@@ -48,9 +48,9 @@ public class AaiPublisherTaskSpy {
AppConfig appConfig = spy(AppConfig.class);
doReturn(mock(AaiClientConfiguration.class)).when(appConfig).getAaiClientConfiguration();
AaiProducerTaskImpl aaiProducerTask = spy(new AaiProducerTaskImpl(appConfig));
- AaiProducerClient aaiProducerClient = mock(AaiProducerClient.class);
+ AaiProducerReactiveHttpClient aaiProducerReactiveHttpClient = mock(AaiProducerReactiveHttpClient.class);
doReturn(mock(AaiClientConfiguration.class)).when(aaiProducerTask).resolveConfiguration();
- doReturn(aaiProducerClient).when(aaiProducerTask).resolveClient();
+ doReturn(aaiProducerReactiveHttpClient).when(aaiProducerTask).resolveClient();
return aaiProducerTask;
}
}
diff --git a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java
index 9a46b2a9..b0d503af 100644
--- a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java
+++ b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java
@@ -50,7 +50,7 @@ class DMaaPConsumerReactiveHttpClientTest {
private DmaapConsumerConfiguration consumerConfigurationMock = mock(DmaapConsumerConfiguration.class);
private static final String JSON_MESSAGE = "{ \"responseFromDmaap\": \"Success\"}";
private Mono<String> expectedResult = Mono.empty();
- private WebClient webClient = mock(WebClient.class);
+ private WebClient webClient;
private RequestHeadersUriSpec requestHeadersSpec;
private ResponseSpec responseSpec;