summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorelinuxhenrik <henrik.b.andersson@est.tech>2018-09-19 08:49:28 +0200
committerelinuxhenrik <henrik.b.andersson@est.tech>2018-09-21 16:46:38 +0200
commit90b9931873e13247e937a6d4b5206c65033b306e (patch)
tree23e667e9d68958edc49a94bb5a02778a6cc45713
parent3468d474187ef01546bdf1180d11453a4f924d31 (diff)
Use correct Content Type from DMaaP
Add the Content Type to the header the DMaaP consumer uses to get the fileReady event from DMAaaP to prevent formatting problems. Change-Id: Iedf38b7542e5709a78f383d31c75e7b95aa56cfe Issue-ID: DCAEGEN2-825 Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java6
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java56
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClientTest.java34
3 files changed, 41 insertions, 55 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
index 72e7d497..e4afd3ae 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
@@ -63,11 +63,11 @@ public class DmaapConsumerJsonParser {
/**
* Extract info from string and create @see {@link FileData}.
*
- * @param monoMessage - results from DMaaP
+ * @param rawMessage - results from DMaaP
* @return reactive Mono with an array of FileData
*/
- public Mono<List<FileData>> getJsonObject(Mono<String> monoMessage) {
- return monoMessage.flatMap(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
+ public Mono<List<FileData>> getJsonObject(Mono<String> rawMessage) {
+ return rawMessage.flatMap(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
}
private Mono<JsonElement> getJsonParserMessage(String message) {
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java
index ad9e6fe7..f32b22c4 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java
@@ -2,31 +2,28 @@
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.service.consumer;
import java.net.URI;
-import java.net.URISyntaxException;
+import java.util.function.Consumer;
-import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.core.publisher.Mono;
@@ -36,8 +33,6 @@ import reactor.core.publisher.Mono;
*/
public class DmaapConsumerReactiveHttpClient {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
private WebClient webClient;
private final String dmaapHostName;
private final String dmaapProtocol;
@@ -45,6 +40,7 @@ public class DmaapConsumerReactiveHttpClient {
private final String dmaapTopicName;
private final String consumerGroup;
private final String consumerId;
+ private final String contentType;
/**
* Constructor of DmaapConsumerReactiveHttpClient.
@@ -58,6 +54,7 @@ public class DmaapConsumerReactiveHttpClient {
this.dmaapTopicName = consumerConfiguration.dmaapTopicName();
this.consumerGroup = consumerConfiguration.consumerGroup();
this.consumerId = consumerConfiguration.consumerId();
+ this.contentType = consumerConfiguration.dmaapContentType();
}
/**
@@ -66,21 +63,16 @@ public class DmaapConsumerReactiveHttpClient {
* @return reactive response from DMaaP in string format
*/
public Mono<String> getDmaapConsumerResponse() {
- try {
- return webClient
- .get()
- .uri(getUri())
- .retrieve()
- .onStatus(HttpStatus::is4xxClientError, clientResponse ->
- Mono.error(new Exception("HTTP 400"))
- )
- .onStatus(HttpStatus::is5xxServerError, clientResponse ->
- Mono.error(new Exception("HTTP 500")))
+ return webClient.get().uri(getUri()).headers(getHeaders()).retrieve()
+ .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new Exception("HTTP 400")))
+ .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new Exception("HTTP 500")))
.bodyToMono(String.class);
- } catch (URISyntaxException e) {
- logger.error("Unable to parse URI in message from xNF.", e);
- return Mono.error(e);
- }
+ }
+
+ private Consumer<HttpHeaders> getHeaders() {
+ return httpHeaders -> {
+ httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType);
+ };
}
private String createRequestPath() {
@@ -92,8 +84,8 @@ public class DmaapConsumerReactiveHttpClient {
return this;
}
- URI getUri() throws URISyntaxException {
- return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
- .setPath(createRequestPath()).build();
+ URI getUri() {
+ return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber)
+ .path(createRequestPath()).build();
}
}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClientTest.java
index 4f96a903..4568bdde 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClientTest.java
@@ -51,8 +51,8 @@ class DmaapConsumerReactiveHttpClientTest {
private static final String JSON_MESSAGE = "{ \"responseFromDmaap\": \"Success\"}";
private Mono<String> expectedResult = Mono.empty();
private WebClient webClient;
- private RequestHeadersUriSpec requestHeadersSpec;
- private ResponseSpec responseSpec;
+ private RequestHeadersUriSpec requestHeadersSpecMock;
+ private ResponseSpec responseSpecMock;
@BeforeEach
@@ -73,8 +73,8 @@ class DmaapConsumerReactiveHttpClientTest {
.filter(basicAuthentication(consumerConfigurationMock.dmaapUserName(),
consumerConfigurationMock.dmaapUserPassword()))
.build());
- requestHeadersSpec = mock(RequestHeadersUriSpec.class);
- responseSpec = mock(ResponseSpec.class);
+ requestHeadersSpecMock = mock(RequestHeadersUriSpec.class);
+ responseSpecMock = mock(ResponseSpec.class);
}
@@ -85,8 +85,9 @@ class DmaapConsumerReactiveHttpClientTest {
//when
mockDependantObjects();
- doReturn(expectedResult).when(responseSpec).bodyToMono(String.class);
+ doReturn(expectedResult).when(responseSpecMock).bodyToMono(String.class);
dmaapConsumerReactiveHttpClient.createDmaapWebClient(webClient);
+
Mono<String> response = dmaapConsumerReactiveHttpClient.getDmaapConsumerResponse();
//then
@@ -98,24 +99,17 @@ class DmaapConsumerReactiveHttpClientTest {
}
@Test
- void getHttpResponse_whenUriSyntaxExceptionHasBeenThrown() throws URISyntaxException {
- //given
- dmaapConsumerReactiveHttpClient = spy(dmaapConsumerReactiveHttpClient);
- //when
- when(webClient.get()).thenReturn(requestHeadersSpec);
- dmaapConsumerReactiveHttpClient.createDmaapWebClient(webClient);
- when(dmaapConsumerReactiveHttpClient.getUri()).thenThrow(URISyntaxException.class);
-
- //then
- StepVerifier.create(dmaapConsumerReactiveHttpClient.getDmaapConsumerResponse()).expectSubscription()
- .expectError(Exception.class).verify();
+ void getAppropriateUri_whenPassingCorrectedUriData() throws URISyntaxException {
+ Assertions.assertEquals(dmaapConsumerReactiveHttpClient.getUri(),
+ URI.create("https://54.45.33.2:1234/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDCAE-c12/c12"));
}
private void mockDependantObjects() {
- when(webClient.get()).thenReturn(requestHeadersSpec);
- when(requestHeadersSpec.uri((URI) any())).thenReturn(requestHeadersSpec);
- when(requestHeadersSpec.retrieve()).thenReturn(responseSpec);
- doReturn(responseSpec).when(responseSpec).onStatus(any(), any());
+ when(webClient.get()).thenReturn(requestHeadersSpecMock);
+ when(requestHeadersSpecMock.uri((URI) any())).thenReturn(requestHeadersSpecMock);
+ when(requestHeadersSpecMock.headers(any())).thenReturn(requestHeadersSpecMock);
+ when(requestHeadersSpecMock.retrieve()).thenReturn(responseSpecMock);
+ doReturn(responseSpecMock).when(responseSpecMock).onStatus(any(), any());
}
} \ No newline at end of file