summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java6
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java56
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClientTest.java34
3 files changed, 41 insertions, 55 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
index 72e7d497..e4afd3ae 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
@@ -63,11 +63,11 @@ public class DmaapConsumerJsonParser {
/**
* Extract info from string and create @see {@link FileData}.
*
- * @param monoMessage - results from DMaaP
+ * @param rawMessage - results from DMaaP
* @return reactive Mono with an array of FileData
*/
- public Mono<List<FileData>> getJsonObject(Mono<String> monoMessage) {
- return monoMessage.flatMap(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
+ public Mono<List<FileData>> getJsonObject(Mono<String> rawMessage) {
+ return rawMessage.flatMap(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
}
private Mono<JsonElement> getJsonParserMessage(String message) {
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java
index ad9e6fe7..f32b22c4 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java
@@ -2,31 +2,28 @@
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.service.consumer;
import java.net.URI;
-import java.net.URISyntaxException;
+import java.util.function.Consumer;
-import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.core.publisher.Mono;
@@ -36,8 +33,6 @@ import reactor.core.publisher.Mono;
*/
public class DmaapConsumerReactiveHttpClient {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
private WebClient webClient;
private final String dmaapHostName;
private final String dmaapProtocol;
@@ -45,6 +40,7 @@ public class DmaapConsumerReactiveHttpClient {
private final String dmaapTopicName;
private final String consumerGroup;
private final String consumerId;
+ private final String contentType;
/**
* Constructor of DmaapConsumerReactiveHttpClient.
@@ -58,6 +54,7 @@ public class DmaapConsumerReactiveHttpClient {
this.dmaapTopicName = consumerConfiguration.dmaapTopicName();
this.consumerGroup = consumerConfiguration.consumerGroup();
this.consumerId = consumerConfiguration.consumerId();
+ this.contentType = consumerConfiguration.dmaapContentType();
}
/**
@@ -66,21 +63,16 @@ public class DmaapConsumerReactiveHttpClient {
* @return reactive response from DMaaP in string format
*/
public Mono<String> getDmaapConsumerResponse() {
- try {
- return webClient
- .get()
- .uri(getUri())
- .retrieve()
- .onStatus(HttpStatus::is4xxClientError, clientResponse ->
- Mono.error(new Exception("HTTP 400"))
- )
- .onStatus(HttpStatus::is5xxServerError, clientResponse ->
- Mono.error(new Exception("HTTP 500")))
+ return webClient.get().uri(getUri()).headers(getHeaders()).retrieve()
+ .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new Exception("HTTP 400")))
+ .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new Exception("HTTP 500")))
.bodyToMono(String.class);
- } catch (URISyntaxException e) {
- logger.error("Unable to parse URI in message from xNF.", e);
- return Mono.error(e);
- }
+ }
+
+ private Consumer<HttpHeaders> getHeaders() {
+ return httpHeaders -> {
+ httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType);
+ };
}
private String createRequestPath() {
@@ -92,8 +84,8 @@ public class DmaapConsumerReactiveHttpClient {
return this;
}
- URI getUri() throws URISyntaxException {
- return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
- .setPath(createRequestPath()).build();
+ URI getUri() {
+ return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber)
+ .path(createRequestPath()).build();
}
}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClientTest.java
index 4f96a903..4568bdde 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClientTest.java
@@ -51,8 +51,8 @@ class DmaapConsumerReactiveHttpClientTest {
private static final String JSON_MESSAGE = "{ \"responseFromDmaap\": \"Success\"}";
private Mono<String> expectedResult = Mono.empty();
private WebClient webClient;
- private RequestHeadersUriSpec requestHeadersSpec;
- private ResponseSpec responseSpec;
+ private RequestHeadersUriSpec requestHeadersSpecMock;
+ private ResponseSpec responseSpecMock;
@BeforeEach
@@ -73,8 +73,8 @@ class DmaapConsumerReactiveHttpClientTest {
.filter(basicAuthentication(consumerConfigurationMock.dmaapUserName(),
consumerConfigurationMock.dmaapUserPassword()))
.build());
- requestHeadersSpec = mock(RequestHeadersUriSpec.class);
- responseSpec = mock(ResponseSpec.class);
+ requestHeadersSpecMock = mock(RequestHeadersUriSpec.class);
+ responseSpecMock = mock(ResponseSpec.class);
}
@@ -85,8 +85,9 @@ class DmaapConsumerReactiveHttpClientTest {
//when
mockDependantObjects();
- doReturn(expectedResult).when(responseSpec).bodyToMono(String.class);
+ doReturn(expectedResult).when(responseSpecMock).bodyToMono(String.class);
dmaapConsumerReactiveHttpClient.createDmaapWebClient(webClient);
+
Mono<String> response = dmaapConsumerReactiveHttpClient.getDmaapConsumerResponse();
//then
@@ -98,24 +99,17 @@ class DmaapConsumerReactiveHttpClientTest {
}
@Test
- void getHttpResponse_whenUriSyntaxExceptionHasBeenThrown() throws URISyntaxException {
- //given
- dmaapConsumerReactiveHttpClient = spy(dmaapConsumerReactiveHttpClient);
- //when
- when(webClient.get()).thenReturn(requestHeadersSpec);
- dmaapConsumerReactiveHttpClient.createDmaapWebClient(webClient);
- when(dmaapConsumerReactiveHttpClient.getUri()).thenThrow(URISyntaxException.class);
-
- //then
- StepVerifier.create(dmaapConsumerReactiveHttpClient.getDmaapConsumerResponse()).expectSubscription()
- .expectError(Exception.class).verify();
+ void getAppropriateUri_whenPassingCorrectedUriData() throws URISyntaxException {
+ Assertions.assertEquals(dmaapConsumerReactiveHttpClient.getUri(),
+ URI.create("https://54.45.33.2:1234/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDCAE-c12/c12"));
}
private void mockDependantObjects() {
- when(webClient.get()).thenReturn(requestHeadersSpec);
- when(requestHeadersSpec.uri((URI) any())).thenReturn(requestHeadersSpec);
- when(requestHeadersSpec.retrieve()).thenReturn(responseSpec);
- doReturn(responseSpec).when(responseSpec).onStatus(any(), any());
+ when(webClient.get()).thenReturn(requestHeadersSpecMock);
+ when(requestHeadersSpecMock.uri((URI) any())).thenReturn(requestHeadersSpecMock);
+ when(requestHeadersSpecMock.headers(any())).thenReturn(requestHeadersSpecMock);
+ when(requestHeadersSpecMock.retrieve()).thenReturn(responseSpecMock);
+ doReturn(responseSpecMock).when(responseSpecMock).onStatus(any(), any());
}
} \ No newline at end of file