aboutsummaryrefslogtreecommitdiffstats
path: root/prh-dmaap-client
diff options
context:
space:
mode:
Diffstat (limited to 'prh-dmaap-client')
-rw-r--r--prh-dmaap-client/pom.xml10
-rw-r--r--prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java38
-rw-r--r--prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClientTest.java156
-rw-r--r--prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImplTest.java96
4 files changed, 189 insertions, 111 deletions
diff --git a/prh-dmaap-client/pom.xml b/prh-dmaap-client/pom.xml
index 9234518d..0633b46a 100644
--- a/prh-dmaap-client/pom.xml
+++ b/prh-dmaap-client/pom.xml
@@ -54,6 +54,11 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-reactor-netty</artifactId>
+ <version>2.0.4.RELEASE</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
@@ -109,5 +114,10 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project> \ No newline at end of file
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java
index a99833dc..cb7d5af2 100644
--- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java
+++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java
@@ -17,14 +17,12 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-
package org.onap.dcaegen2.services.prh.service.consumer;
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.Optional;
import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.slf4j.Logger;
@@ -49,6 +47,9 @@ public class DmaapConsumerReactiveHttpClient {
private final String dmaapTopicName;
private final String consumerGroup;
private final String consumerId;
+ private final String dmaapContentType;
+ private final String dmaapUserName;
+ private final String dmaapUserPassword;
public DmaapConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) {
this.dmaapHostName = consumerConfiguration.dmaapHostName();
@@ -57,17 +58,21 @@ public class DmaapConsumerReactiveHttpClient {
this.dmaapTopicName = consumerConfiguration.dmaapTopicName();
this.consumerGroup = consumerConfiguration.consumerGroup();
this.consumerId = consumerConfiguration.consumerId();
- String dmaapContentType = consumerConfiguration.dmaapContentType();
+ this.dmaapContentType = consumerConfiguration.dmaapContentType();
+ this.dmaapUserName = consumerConfiguration.dmaapUserName();
+ this.dmaapUserPassword = consumerConfiguration.dmaapUserPassword();
+ }
+
+ public void initWebClient() {
this.webClient = WebClient.builder()
.defaultHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType)
- .filter(
- basicAuthentication(consumerConfiguration.dmaapUserName(), consumerConfiguration.dmaapUserPassword()))
+ .filter(basicAuthentication(dmaapUserName, dmaapUserPassword))
.filter(logRequest())
.filter(logResponse())
.build();
}
- public Mono<Optional<String>> getDmaaPConsumerResponse() {
+ public Mono<String> getDmaaPConsumerResponse() {
try {
return webClient
.get()
@@ -78,31 +83,34 @@ public class DmaapConsumerReactiveHttpClient {
)
.onStatus(HttpStatus::is5xxServerError, clientResponse ->
Mono.error(new Exception("HTTP 500")))
- .bodyToMono(String.class)
- .map(Optional::of);
+ .bodyToMono(String.class);
} catch (URISyntaxException e) {
logger.warn("Exception while executing HTTP request: ", e);
return Mono.error(e);
}
}
- private URI getUri() throws URISyntaxException {
- return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
- .setPath(createRequestPath()).build();
- }
-
private String createRequestPath() {
return dmaapTopicName + "/" + consumerGroup + "/" + consumerId;
}
- private ExchangeFilterFunction logResponse() {
+ void initWebClient(WebClient webClient) {
+ this.webClient = webClient;
+ }
+
+ ExchangeFilterFunction logResponse() {
return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
logger.info("Response Status {}", clientResponse.statusCode());
return Mono.just(clientResponse);
});
}
- private ExchangeFilterFunction logRequest() {
+ URI getUri() throws URISyntaxException {
+ return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
+ .setPath(createRequestPath()).build();
+ }
+
+ ExchangeFilterFunction logRequest() {
return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
clientRequest.headers()
diff --git a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClientTest.java b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClientTest.java
new file mode 100644
index 00000000..63966602
--- /dev/null
+++ b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClientTest.java
@@ -0,0 +1,156 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.service.consumer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClient.RequestHeadersUriSpec;
+import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/27/18
+ */
+public class DmaapConsumerReactiveHttpClientTest {
+
+ private static DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
+
+ private static DmaapConsumerConfiguration consumerConfigurationMock = mock(DmaapConsumerConfiguration.class);
+ private static final String JSON_MESSAGE = "{ \"responseFromDmaap\": \"Success\"}";
+ private static Mono<String> expectedResult = Mono.empty();
+ private static WebClient webClient = mock(WebClient.class);
+ private static RequestHeadersUriSpec requestHeadersSpec;
+ private static ResponseSpec responseSpec;
+
+
+ @BeforeAll
+ public static void setUp() {
+ when(consumerConfigurationMock.dmaapHostName()).thenReturn("54.45.33.2");
+ when(consumerConfigurationMock.dmaapProtocol()).thenReturn("https");
+ when(consumerConfigurationMock.dmaapPortNumber()).thenReturn(1234);
+ when(consumerConfigurationMock.dmaapUserName()).thenReturn("PRH");
+ when(consumerConfigurationMock.dmaapUserPassword()).thenReturn("PRH");
+ when(consumerConfigurationMock.dmaapContentType()).thenReturn("application/json");
+ when(consumerConfigurationMock.dmaapTopicName()).thenReturn("unauthenticated.SEC_OTHER_OUTPUT");
+ when(consumerConfigurationMock.consumerGroup()).thenReturn("OpenDCAE-c12");
+ when(consumerConfigurationMock.consumerId()).thenReturn("c12");
+
+ dmaapConsumerReactiveHttpClient = new DmaapConsumerReactiveHttpClient(consumerConfigurationMock);
+ webClient = spy(WebClient.builder()
+ .defaultHeader(HttpHeaders.CONTENT_TYPE, consumerConfigurationMock.dmaapContentType())
+ .filter(basicAuthentication(consumerConfigurationMock.dmaapUserName(),
+ consumerConfigurationMock.dmaapUserPassword()))
+ .filter(dmaapConsumerReactiveHttpClient.logRequest())
+ .filter(dmaapConsumerReactiveHttpClient.logResponse())
+ .build());
+ requestHeadersSpec = mock(RequestHeadersUriSpec.class);
+ responseSpec = mock(ResponseSpec.class);
+ }
+
+
+ @Test
+ public void getHttpResponse_Success() {
+ //given
+ expectedResult = Mono.just(JSON_MESSAGE);
+
+ //when
+ mockDependantObjects();
+ doReturn(expectedResult).when(responseSpec).bodyToMono(String.class);
+ dmaapConsumerReactiveHttpClient.initWebClient(webClient);
+ Mono<String> response = dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse();
+
+ //then
+ StepVerifier.create(response).expectSubscription()
+ .expectNextMatches(results -> {
+ Assertions.assertEquals(results, expectedResult.block());
+ return true;
+ }).verifyComplete();
+ }
+
+
+ @Test
+ public void getHttpResponse_HttpResponse4xxClientError() {
+
+ //when
+ mockDependantObjects();
+ doAnswer(invocationOnMock -> Mono.error(new Exception("400")))
+ .when(responseSpec).onStatus(HttpStatus::is4xxClientError, e -> Mono.error(new Exception("400")));
+ dmaapConsumerReactiveHttpClient.initWebClient();
+ dmaapConsumerReactiveHttpClient.initWebClient(webClient);
+
+ //then
+ StepVerifier.create(dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()).expectSubscription()
+ .expectError(Exception.class);
+
+ }
+
+ @Test
+ public void getHttpResponse_HttpResponse5xxClientError() {
+
+ //when
+ mockDependantObjects();
+ doAnswer(invocationOnMock -> Mono.error(new Exception("500")))
+ .when(responseSpec).onStatus(HttpStatus::is4xxClientError, e -> Mono.error(new Exception("500")));
+ dmaapConsumerReactiveHttpClient.initWebClient();
+ dmaapConsumerReactiveHttpClient.initWebClient(webClient);
+
+ //then
+ StepVerifier.create(dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()).expectSubscription()
+ .expectError(Exception.class);
+ }
+
+ @Test
+ public void getHttpResponse_whenURISyntaxExceptionHasBeenThrown() throws URISyntaxException {
+ //given
+ dmaapConsumerReactiveHttpClient = spy(dmaapConsumerReactiveHttpClient);
+ //when
+ when(webClient.get()).thenReturn(requestHeadersSpec);
+ dmaapConsumerReactiveHttpClient.initWebClient(webClient);
+ when(dmaapConsumerReactiveHttpClient.getUri()).thenThrow(URISyntaxException.class);
+
+ //then
+ StepVerifier.create(dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()).expectSubscription()
+ .expectError(Exception.class);
+ }
+
+ private void mockDependantObjects() {
+ when(webClient.get()).thenReturn(requestHeadersSpec);
+ when(requestHeadersSpec.uri((URI) any())).thenReturn(requestHeadersSpec);
+ when(requestHeadersSpec.retrieve()).thenReturn(responseSpec);
+ doReturn(responseSpec).when(responseSpec).onStatus(any(), any());
+ }
+
+} \ No newline at end of file
diff --git a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImplTest.java b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImplTest.java
deleted file mode 100644
index d2c0e77b..00000000
--- a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImplTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-///*-
-// * ============LICENSE_START=======================================================
-// * PNF-REGISTRATION-HANDLER
-// * ================================================================================
-// * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
-// * ================================================================================
-// * Licensed under the Apache License, Version 2.0 (the "License");
-// * you may not use this file except in compliance with the License.
-// * You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// * ============LICENSE_END=========================================================
-// */
-//
-//package org.onap.dcaegen2.services.prh.service.consumer;
-//
-//import org.apache.http.client.ResponseHandler;
-//import org.apache.http.client.methods.HttpGet;
-//import org.apache.http.impl.client.CloseableHttpClient;
-//import org.junit.jupiter.api.Assertions;
-//import org.junit.jupiter.api.BeforeAll;
-//import org.junit.jupiter.api.Test;
-//import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
-//
-//import java.io.IOException;
-//import java.lang.reflect.Field;
-//import java.util.Optional;
-//
-//import static org.mockito.ArgumentMatchers.any;
-//import static org.mockito.Mockito.mock;
-//import static org.mockito.Mockito.when;
-//
-//
-//public class ExtendedDmaapConsumerHttpClientImplTest {
-//
-// private static ExtendedDmaapConsumerHttpClientImpl objectUnderTest;
-//
-// private static DmaapConsumerConfiguration configurationMock = mock(DmaapConsumerConfiguration.class);
-// private static CloseableHttpClient closeableHttpClientMock = mock(CloseableHttpClient.class);
-//
-// private static final String JSON_MESSAGE = "{ \"responseFromDmaap\": \"Success\" }";
-//
-// private static Optional<String> expectedResult = Optional.empty();
-//
-// @BeforeAll
-// public static void init() throws NoSuchFieldException, IllegalAccessException {
-//
-// when(configurationMock.dmaapHostName()).thenReturn("54.45.33.2");
-// when(configurationMock.dmaapProtocol()).thenReturn("https");
-// when(configurationMock.dmaapPortNumber()).thenReturn(1234);
-// when(configurationMock.dmaapUserName()).thenReturn("PRH");
-// when(configurationMock.dmaapUserPassword()).thenReturn("PRH");
-// when(configurationMock.dmaapContentType()).thenReturn("application/json");
-// when(configurationMock.dmaapTopicName()).thenReturn("unauthenticated.SEC_OTHER_OUTPUT");
-// when(configurationMock.consumerGroup()).thenReturn("OpenDCAE-c12");
-// when(configurationMock.consumerId()).thenReturn("c12");
-//
-// objectUnderTest = new ExtendedDmaapConsumerHttpClientImpl(configurationMock);
-//
-// setField();
-// }
-//
-//
-// @Test
-// public void getHttpResponseGet_success() throws IOException {
-// expectedResult = Optional.of(JSON_MESSAGE);
-//
-// when(closeableHttpClientMock.execute(any(HttpGet.class), any(ResponseHandler.class)))
-// .thenReturn(expectedResult);
-//
-// Optional<String> actualResult = objectUnderTest.getHttpConsumerResponse();
-//
-// Assertions.assertEquals(expectedResult.get(), actualResult.get());
-// }
-//
-// @Test
-// public void getExtendedDetails_returnsNull() throws IOException {
-// when(closeableHttpClientMock.execute(any(HttpGet.class), any(ResponseHandler.class))).
-// thenReturn(Optional.empty());
-// Optional<String> actualResult = objectUnderTest.getHttpConsumerResponse();
-// Assertions.assertEquals(Optional.empty(),actualResult);
-// }
-//
-//
-// private static void setField() throws NoSuchFieldException, IllegalAccessException {
-// Field field = objectUnderTest.getClass().getDeclaredField("closeableHttpClient");
-// field.setAccessible(true);
-// field.set(objectUnderTest, closeableHttpClientMock);
-// }
-//}