diff options
3 files changed, 41 insertions, 55 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java index 72e7d497..e4afd3ae 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java @@ -63,11 +63,11 @@ public class DmaapConsumerJsonParser { /** * Extract info from string and create @see {@link FileData}. * - * @param monoMessage - results from DMaaP + * @param rawMessage - results from DMaaP * @return reactive Mono with an array of FileData */ - public Mono<List<FileData>> getJsonObject(Mono<String> monoMessage) { - return monoMessage.flatMap(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel); + public Mono<List<FileData>> getJsonObject(Mono<String> rawMessage) { + return rawMessage.flatMap(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel); } private Mono<JsonElement> getJsonParserMessage(String message) { diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java index ad9e6fe7..f32b22c4 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java @@ -2,31 +2,28 @@ * ============LICENSE_START====================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.service.consumer; import java.net.URI; -import java.net.URISyntaxException; +import java.util.function.Consumer; -import org.apache.http.client.utils.URIBuilder; import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.util.DefaultUriBuilderFactory; import reactor.core.publisher.Mono; @@ -36,8 +33,6 @@ import reactor.core.publisher.Mono; */ public class DmaapConsumerReactiveHttpClient { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private WebClient webClient; private final String dmaapHostName; private final String dmaapProtocol; @@ -45,6 +40,7 @@ public class DmaapConsumerReactiveHttpClient { private final String dmaapTopicName; private final String consumerGroup; private final String consumerId; + private final String contentType; /** * Constructor of DmaapConsumerReactiveHttpClient. @@ -58,6 +54,7 @@ public class DmaapConsumerReactiveHttpClient { this.dmaapTopicName = consumerConfiguration.dmaapTopicName(); this.consumerGroup = consumerConfiguration.consumerGroup(); this.consumerId = consumerConfiguration.consumerId(); + this.contentType = consumerConfiguration.dmaapContentType(); } /** @@ -66,21 +63,16 @@ public class DmaapConsumerReactiveHttpClient { * @return reactive response from DMaaP in string format */ public Mono<String> getDmaapConsumerResponse() { - try { - return webClient - .get() - .uri(getUri()) - .retrieve() - .onStatus(HttpStatus::is4xxClientError, clientResponse -> - Mono.error(new Exception("HTTP 400")) - ) - .onStatus(HttpStatus::is5xxServerError, clientResponse -> - Mono.error(new Exception("HTTP 500"))) + return webClient.get().uri(getUri()).headers(getHeaders()).retrieve() + .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new Exception("HTTP 400"))) + .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new Exception("HTTP 500"))) .bodyToMono(String.class); - } catch (URISyntaxException e) { - logger.error("Unable to parse URI in message from xNF.", e); - return Mono.error(e); - } + } + + private Consumer<HttpHeaders> getHeaders() { + return httpHeaders -> { + httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType); + }; } private String createRequestPath() { @@ -92,8 +84,8 @@ public class DmaapConsumerReactiveHttpClient { return this; } - URI getUri() throws URISyntaxException { - return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber) - .setPath(createRequestPath()).build(); + URI getUri() { + return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber) + .path(createRequestPath()).build(); } } diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClientTest.java index 4f96a903..4568bdde 100644 --- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClientTest.java +++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClientTest.java @@ -51,8 +51,8 @@ class DmaapConsumerReactiveHttpClientTest { private static final String JSON_MESSAGE = "{ \"responseFromDmaap\": \"Success\"}"; private Mono<String> expectedResult = Mono.empty(); private WebClient webClient; - private RequestHeadersUriSpec requestHeadersSpec; - private ResponseSpec responseSpec; + private RequestHeadersUriSpec requestHeadersSpecMock; + private ResponseSpec responseSpecMock; @BeforeEach @@ -73,8 +73,8 @@ class DmaapConsumerReactiveHttpClientTest { .filter(basicAuthentication(consumerConfigurationMock.dmaapUserName(), consumerConfigurationMock.dmaapUserPassword())) .build()); - requestHeadersSpec = mock(RequestHeadersUriSpec.class); - responseSpec = mock(ResponseSpec.class); + requestHeadersSpecMock = mock(RequestHeadersUriSpec.class); + responseSpecMock = mock(ResponseSpec.class); } @@ -85,8 +85,9 @@ class DmaapConsumerReactiveHttpClientTest { //when mockDependantObjects(); - doReturn(expectedResult).when(responseSpec).bodyToMono(String.class); + doReturn(expectedResult).when(responseSpecMock).bodyToMono(String.class); dmaapConsumerReactiveHttpClient.createDmaapWebClient(webClient); + Mono<String> response = dmaapConsumerReactiveHttpClient.getDmaapConsumerResponse(); //then @@ -98,24 +99,17 @@ class DmaapConsumerReactiveHttpClientTest { } @Test - void getHttpResponse_whenUriSyntaxExceptionHasBeenThrown() throws URISyntaxException { - //given - dmaapConsumerReactiveHttpClient = spy(dmaapConsumerReactiveHttpClient); - //when - when(webClient.get()).thenReturn(requestHeadersSpec); - dmaapConsumerReactiveHttpClient.createDmaapWebClient(webClient); - when(dmaapConsumerReactiveHttpClient.getUri()).thenThrow(URISyntaxException.class); - - //then - StepVerifier.create(dmaapConsumerReactiveHttpClient.getDmaapConsumerResponse()).expectSubscription() - .expectError(Exception.class).verify(); + void getAppropriateUri_whenPassingCorrectedUriData() throws URISyntaxException { + Assertions.assertEquals(dmaapConsumerReactiveHttpClient.getUri(), + URI.create("https://54.45.33.2:1234/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDCAE-c12/c12")); } private void mockDependantObjects() { - when(webClient.get()).thenReturn(requestHeadersSpec); - when(requestHeadersSpec.uri((URI) any())).thenReturn(requestHeadersSpec); - when(requestHeadersSpec.retrieve()).thenReturn(responseSpec); - doReturn(responseSpec).when(responseSpec).onStatus(any(), any()); + when(webClient.get()).thenReturn(requestHeadersSpecMock); + when(requestHeadersSpecMock.uri((URI) any())).thenReturn(requestHeadersSpecMock); + when(requestHeadersSpecMock.headers(any())).thenReturn(requestHeadersSpecMock); + when(requestHeadersSpecMock.retrieve()).thenReturn(responseSpecMock); + doReturn(responseSpecMock).when(responseSpecMock).onStatus(any(), any()); } }
\ No newline at end of file |