summaryrefslogtreecommitdiffstats
path: root/datafile-dmaap-client/src
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-dmaap-client/src')
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java21
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java10
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java17
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java (renamed from datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DMaaPReactiveWebClient.java)20
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java30
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java (renamed from datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DMaaPConsumerReactiveHttpClient.java)26
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClient.java96
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java151
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DMaaPReactiveWebClientTest.java58
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/config/DmaapConsumerConfigurationTest.java34
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/config/DmaapPublisherConfigurationTest.java27
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClientTest.java (renamed from datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DMaaPConsumerReactiveHttpClientTest.java)35
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClientTest.java121
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java129
14 files changed, 390 insertions, 385 deletions
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java
index 57b11127..dd7519f9 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java
@@ -1,9 +1,7 @@
/*
- * ============LICENSE_START=======================================================
- * Datafile Collector Service
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -15,14 +13,13 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END=========================================================
+ * ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.config;
import org.immutables.gson.Gson;
import org.immutables.value.Value;
-import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration;
import org.springframework.stereotype.Component;
/**
@@ -43,20 +40,20 @@ public abstract class DmaapConsumerConfiguration implements DmaapCustomConfig {
public abstract String consumerGroup();
@Value.Parameter
- public abstract Integer timeoutMs();
+ public abstract Integer timeoutMS();
@Value.Parameter
public abstract Integer messageLimit();
- public interface Builder extends
- DmaapCustomConfig.Builder<DmaapConsumerConfiguration, DmaapConsumerConfiguration.Builder> {
+ public interface Builder
+ extends DmaapCustomConfig.Builder<DmaapConsumerConfiguration, DmaapConsumerConfiguration.Builder> {
Builder consumerId(String consumerId);
Builder consumerGroup(String consumerGroup);
- Builder timeoutMs(Integer timeoutMs);
+ Builder timeoutMS(Integer timeoutMS);
Builder messageLimit(Integer messageLimit);
}
@@ -65,4 +62,4 @@ public abstract class DmaapConsumerConfiguration implements DmaapCustomConfig {
return ImmutableDmaapConsumerConfiguration.builder();
}
-} \ No newline at end of file
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java
index 31bbfc0e..0b1d99eb 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java
@@ -1,9 +1,7 @@
/*
- * ============LICENSE_START=======================================================
- * Datafile Collector Service
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -15,7 +13,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END=========================================================
+ * ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.config;
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java
index cd520569..d0918446 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java
@@ -1,9 +1,7 @@
/*
- * ============LICENSE_START=======================================================
- * Datafile Collector Service
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -15,16 +13,13 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END=========================================================
+ * ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.config;
import org.immutables.gson.Gson;
import org.immutables.value.Value;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapCustomConfig;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -36,8 +31,8 @@ public abstract class DmaapPublisherConfiguration implements DmaapCustomConfig {
private static final long serialVersionUID = 1L;
- interface Builder extends
- DmaapCustomConfig.Builder<DmaapPublisherConfiguration, DmaapPublisherConfiguration.Builder> {
+ interface Builder
+ extends DmaapCustomConfig.Builder<DmaapPublisherConfiguration, DmaapPublisherConfiguration.Builder> {
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DMaaPReactiveWebClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
index b4cbfeea..d5878b0d 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DMaaPReactiveWebClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
@@ -1,9 +1,7 @@
/*
- * ============LICENSE_START=======================================================
- * Datafile Collector Service
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -15,7 +13,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END=========================================================
+ * ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.service;
@@ -23,18 +21,18 @@ package org.onap.dcaegen2.collectors.datafile.service;
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
import org.onap.dcaegen2.collectors.datafile.config.DmaapCustomConfig;
-import org.onap.dcaegen2.collectors.datafile.service.DMaaPReactiveWebClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
+
import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
*/
-public class DMaaPReactiveWebClient {
+public class DmaapReactiveWebClient {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -43,12 +41,12 @@ public class DMaaPReactiveWebClient {
private String dmaaPUserPassword;
/**
- * Creating DMaaPReactiveWebClient passing to them basic DMaaPConfig.
+ * Creating DmaapReactiveWebClient passing to them basic DmaapConfig.
*
* @param dmaapCustomConfig - configuration object
- * @return DMaaPReactiveWebClient
+ * @return DmaapReactiveWebClient
*/
- public DMaaPReactiveWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) {
+ public DmaapReactiveWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) {
this.dmaaPUserName = dmaapCustomConfig.dmaapUserName();
this.dmaaPUserPassword = dmaapCustomConfig.dmaapUserPassword();
this.dmaaPContentType = dmaapCustomConfig.dmaapContentType();
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
new file mode 100644
index 00000000..2b44233f
--- /dev/null
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import org.apache.http.HttpStatus;
+
+public final class HttpUtils implements HttpStatus {
+
+ private HttpUtils() {}
+
+ public static boolean isSuccessfulResponseCode(Integer statusCode) {
+ return statusCode >= 200 && statusCode < 300;
+ }
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DMaaPConsumerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java
index 1fcebeac..ad9e6fe7 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DMaaPConsumerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java
@@ -1,9 +1,7 @@
/*
- * ============LICENSE_START=======================================================
- * Datafile Collector Service
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -15,26 +13,28 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END=========================================================
+ * ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.service.consumer;
import java.net.URI;
import java.net.URISyntaxException;
+
import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.collectors.datafile.service.consumer.DMaaPConsumerReactiveHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.client.WebClient;
+
import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/26/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-public class DMaaPConsumerReactiveHttpClient {
+public class DmaapConsumerReactiveHttpClient {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -47,11 +47,11 @@ public class DMaaPConsumerReactiveHttpClient {
private final String consumerId;
/**
- * Constructor of DMaaPConsumerReactiveHttpClient.
+ * Constructor of DmaapConsumerReactiveHttpClient.
*
* @param consumerConfiguration - DMaaP consumer configuration object
*/
- public DMaaPConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) {
+ public DmaapConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) {
this.dmaapHostName = consumerConfiguration.dmaapHostName();
this.dmaapProtocol = consumerConfiguration.dmaapProtocol();
this.dmaapPortNumber = consumerConfiguration.dmaapPortNumber();
@@ -65,7 +65,7 @@ public class DMaaPConsumerReactiveHttpClient {
*
* @return reactive response from DMaaP in string format
*/
- public Mono<String> getDMaaPConsumerResponse() {
+ public Mono<String> getDmaapConsumerResponse() {
try {
return webClient
.get()
@@ -78,7 +78,7 @@ public class DMaaPConsumerReactiveHttpClient {
Mono.error(new Exception("HTTP 500")))
.bodyToMono(String.class);
} catch (URISyntaxException e) {
- logger.warn("Exception while evaluating URI ");
+ logger.error("Unable to parse URI in message from xNF.", e);
return Mono.error(e);
}
}
@@ -87,7 +87,7 @@ public class DMaaPConsumerReactiveHttpClient {
return dmaapTopicName + "/" + consumerGroup + "/" + consumerId;
}
- public DMaaPConsumerReactiveHttpClient createDMaaPWebClient(WebClient webClient) {
+ public DmaapConsumerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
this.webClient = webClient;
return this;
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClient.java
deleted file mode 100644
index c6889df4..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClient.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Datafile Collector Service
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.service.producer;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import org.apache.http.client.utils.URIBuilder;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.producer.DMaaPProducerReactiveHttpClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.reactive.function.BodyInserters;
-import org.springframework.web.reactive.function.client.WebClient;
-import reactor.core.publisher.Mono;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
- */
-public class DMaaPProducerReactiveHttpClient {
-
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
- private WebClient webClient;
- private final String dmaapHostName;
- private final Integer dmaapPortNumber;
- private final String dmaapProtocol;
- private final String dmaapTopicName;
-
- /**
- * Constructor DMaaPProducerReactiveHttpClient.
- *
- * @param dmaapPublisherConfiguration - DMaaP producer configuration object
- */
- public DMaaPProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
- this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
- this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
- this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
- this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
- }
-
- /**
- * Function for calling DMaaP HTTP producer - post request to DMaaP.
- *
- * @param consumerDmaapModelMono - object which will be sent to DMaaP
- * @return status code of operation
- */
- public Mono<String> getDMaaPProducerResponse(Mono<ConsumerDmaapModel> consumerDmaapModelMono) {
- try {
- return webClient
- .post()
- .uri(getUri())
- .body(BodyInserters.fromObject(consumerDmaapModelMono))
- .retrieve()
- .onStatus(HttpStatus::is4xxClientError, clientResponse ->
- Mono.error(new Exception("HTTP 400"))
- )
- .onStatus(HttpStatus::is5xxServerError, clientResponse ->
- Mono.error(new Exception("HTTP 500")))
- .bodyToMono(String.class);
- } catch (URISyntaxException e) {
- logger.warn("Exception while evaluating URI");
- return Mono.error(e);
- }
- }
-
- public DMaaPProducerReactiveHttpClient createDMaaPWebClient(WebClient webClient) {
- this.webClient = webClient;
- return this;
- }
-
- URI getUri() throws URISyntaxException {
- return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
- .setPath(dmaapTopicName).build();
- }
-
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
new file mode 100644
index 00000000..8010bdc1
--- /dev/null
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -0,0 +1,151 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service.producer;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.utils.URIBuilder;
+import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
+import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.FileSystemResource;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.reactive.function.BodyInserters;
+import org.springframework.web.reactive.function.client.ClientResponse;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
+import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public class DmaapProducerReactiveHttpClient {
+
+ private static final String X_ATT_DR_META = "X-ATT-DR-META";
+ private static final String LOCATION = "location";
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private WebClient webClient;
+ private final String dmaapHostName;
+ private final Integer dmaapPortNumber;
+ private final String dmaapProtocol;
+ private final String dmaapTopicName;
+ private final String dmaapContentType;
+
+ /**
+ * Constructor DmaapProducerReactiveHttpClient.
+ *
+ * @param dmaapPublisherConfiguration - DMaaP producer configuration object
+ */
+ public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
+
+ this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
+ this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
+ this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
+ this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
+ this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
+ }
+
+ /**
+ * Function for calling DMaaP HTTP producer - post request to DMaaP.
+ *
+ * @param consumerDmaapModelMono - object which will be sent to DMaaP
+ * @return status code of operation
+ */
+ public Mono<String> getDmaapProducerResponse(Mono<List<ConsumerDmaapModel>> consumerDmaapModelMono) {
+ consumerDmaapModelMono.subscribe(models -> postFilesAndData(models));
+ return Mono.just(HttpStatus.OK.toString());
+ }
+
+ public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
+ this.webClient = webClient;
+ return this;
+ }
+
+ private void postFilesAndData(List<ConsumerDmaapModel> models) {
+ for (ConsumerDmaapModel consumerDmaapModel : models) {
+ postFileAndData(consumerDmaapModel);
+ }
+ }
+
+ private void postFileAndData(ConsumerDmaapModel model) {
+ RequestBodyUriSpec post = webClient.post();
+
+ boolean headPrepared = prepareHead(model, post);
+
+ if (headPrepared) {
+ prepareBody(model, post);
+
+ ResponseSpec responseSpec = post.retrieve();
+ responseSpec.onStatus(HttpStatus::is4xxClientError,
+ clientResponse -> handlePostErrors(model, clientResponse));
+ responseSpec.onStatus(HttpStatus::is5xxServerError,
+ clientResponse -> handlePostErrors(model, clientResponse));
+ String bodyToMono = responseSpec.bodyToMono(String.class).block();
+ }
+ }
+
+ private boolean prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) {
+ boolean result = true;
+ try {
+ post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType);
+
+ JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
+ metaData.getAsJsonObject().remove(LOCATION);
+ post.header(X_ATT_DR_META, metaData.toString());
+
+ post.uri(getUri());
+ } catch (Exception e) {
+ logger.error("Unable to post file to Data Router. " + model, e);
+ result = false;
+ }
+
+ return result;
+ }
+
+ private void prepareBody(ConsumerDmaapModel model, RequestBodyUriSpec post) {
+ String fileLocation = model.getLocation();
+ File fileResource = new File(fileLocation);
+ FileSystemResource httpResource = new FileSystemResource(fileResource);
+ post.body(BodyInserters.fromResource(httpResource));
+ }
+
+ private URI getUri() throws URISyntaxException {
+ return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
+ .setPath(dmaapTopicName).build();
+ }
+
+ private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) {
+ String errorMessage = "Unable to post file to Data Router. " + model + "Reason: " + clientResponse.toString();
+ logger.error(errorMessage);
+
+ return Mono.error(new Exception(errorMessage));
+ }
+}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DMaaPReactiveWebClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DMaaPReactiveWebClientTest.java
deleted file mode 100644
index dea22340..00000000
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DMaaPReactiveWebClientTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Datafile Collector Service
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.service;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.collectors.datafile.service.DMaaPReactiveWebClient;
-import org.springframework.web.reactive.function.client.WebClient;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/5/18
- */
-class DMaaPReactiveWebClientTest {
-
-
- @Test
- void builder_shouldBuildDMaaPReactiveWebClient() {
- //given
- DmaapConsumerConfiguration dmaapConsumerConfiguration = mock(DmaapConsumerConfiguration.class);
- String dmaaPContentType = "*/*";
- String dmaaPUserName = "DMaaP";
- String dmaaPUserPassword = "DMaaP";
-
- //when
- when(dmaapConsumerConfiguration.dmaapContentType()).thenReturn(dmaaPContentType);
- when(dmaapConsumerConfiguration.dmaapUserName()).thenReturn(dmaaPUserName);
- when(dmaapConsumerConfiguration.dmaapUserPassword()).thenReturn(dmaaPUserPassword);
- WebClient dmaapreactiveWebClient = new DMaaPReactiveWebClient()
- .fromConfiguration(dmaapConsumerConfiguration)
- .build();
-
- //then
- Assertions.assertNotNull(dmaapreactiveWebClient);
-
- }
-} \ No newline at end of file
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/config/DmaapConsumerConfigurationTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/config/DmaapConsumerConfigurationTest.java
index d0943709..b67946b2 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/config/DmaapConsumerConfigurationTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/config/DmaapConsumerConfigurationTest.java
@@ -1,9 +1,7 @@
/*
- * ============LICENSE_START=======================================================
- * Datafile Collector Service
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -15,7 +13,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END=========================================================
+ * ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.service.config;
@@ -25,10 +23,10 @@ import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration;
-class DmaapConsumerConfigurationTest {
+public class DmaapConsumerConfigurationTest {
@Test
- void builder_shouldBuildConfigurationObject() {
+ public void builder_shouldBuildConfigurationObject() {
// Given
DmaapConsumerConfiguration configuration;
@@ -45,19 +43,11 @@ class DmaapConsumerConfigurationTest {
Integer messageLimit = 1000;
// When
- configuration = new ImmutableDmaapConsumerConfiguration.Builder()
- .consumerId(consumerId)
- .dmaapHostName(dmaapHostName)
- .dmaapPortNumber(dmaapPortNumber)
- .dmaapTopicName(dmaapTopicName)
- .dmaapProtocol(dmaapProtocol)
- .dmaapUserName(dmaapUserName)
- .dmaapUserPassword(dmaapUserPassword)
- .dmaapContentType(dmaapContentType)
- .consumerGroup(consumerGroup)
- .timeoutMs(timeoutMs)
- .messageLimit(messageLimit)
- .build();
+ configuration = new ImmutableDmaapConsumerConfiguration.Builder().consumerId(consumerId)
+ .dmaapHostName(dmaapHostName).dmaapPortNumber(dmaapPortNumber).dmaapTopicName(dmaapTopicName)
+ .dmaapProtocol(dmaapProtocol).dmaapUserName(dmaapUserName).dmaapUserPassword(dmaapUserPassword)
+ .dmaapContentType(dmaapContentType).consumerGroup(consumerGroup).timeoutMS(timeoutMs)
+ .messageLimit(messageLimit).build();
// Then
Assertions.assertNotNull(configuration);
@@ -69,7 +59,7 @@ class DmaapConsumerConfigurationTest {
Assertions.assertEquals(dmaapUserName, configuration.dmaapUserName());
Assertions.assertEquals(dmaapUserPassword, configuration.dmaapUserPassword());
Assertions.assertEquals(consumerGroup, configuration.consumerGroup());
- Assertions.assertEquals(timeoutMs, configuration.timeoutMs());
+ Assertions.assertEquals(timeoutMs, configuration.timeoutMS());
Assertions.assertEquals(messageLimit, configuration.messageLimit());
}
}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/config/DmaapPublisherConfigurationTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/config/DmaapPublisherConfigurationTest.java
index a188921c..fb8e8751 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/config/DmaapPublisherConfigurationTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/config/DmaapPublisherConfigurationTest.java
@@ -1,9 +1,7 @@
/*
- * ============LICENSE_START=======================================================
- * Datafile Collector Service
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -15,7 +13,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END=========================================================
+ * ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.service.config;
@@ -25,11 +23,11 @@ import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration;
-class DmaapPublisherConfigurationTest {
+public class DmaapPublisherConfigurationTest {
@Test
- void builder_shouldBuildConfigurationObject() {
+ public void builder_shouldBuildConfigurationObject() {
// Given
DmaapPublisherConfiguration configuration;
@@ -42,15 +40,10 @@ class DmaapPublisherConfigurationTest {
String dmaapContentType = "application/json";
// When
- configuration = new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapHostName(dmaapHostName)
- .dmaapPortNumber(dmaapPortNumber)
- .dmaapTopicName(dmaapTopicName)
- .dmaapProtocol(dmaapProtocol)
- .dmaapUserName(dmaapUserName)
- .dmaapUserPassword(dmaapUserPassword)
- .dmaapContentType(dmaapContentType)
- .build();
+ configuration = new ImmutableDmaapPublisherConfiguration.Builder().dmaapHostName(dmaapHostName)
+ .dmaapPortNumber(dmaapPortNumber).dmaapTopicName(dmaapTopicName).dmaapProtocol(dmaapProtocol)
+ .dmaapUserName(dmaapUserName).dmaapUserPassword(dmaapUserPassword).dmaapContentType(dmaapContentType)
+ .build();
// Then
Assertions.assertNotNull(configuration);
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DMaaPConsumerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClientTest.java
index abac3cd5..4f96a903 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DMaaPConsumerReactiveHttpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClientTest.java
@@ -1,9 +1,7 @@
/*
- * ============LICENSE_START=======================================================
- * Datafile Collector Service
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -15,7 +13,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END=========================================================
+ * ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.service.consumer;
@@ -29,24 +27,25 @@ import static org.springframework.web.reactive.function.client.ExchangeFilterFun
import java.net.URI;
import java.net.URISyntaxException;
+
+import org.apache.http.HttpHeaders;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.collectors.datafile.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.RequestHeadersUriSpec;
import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
+
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/27/18
*/
-class DMaaPConsumerReactiveHttpClientTest {
+class DmaapConsumerReactiveHttpClientTest {
- private DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
+ private DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
private DmaapConsumerConfiguration consumerConfigurationMock = mock(DmaapConsumerConfiguration.class);
private static final String JSON_MESSAGE = "{ \"responseFromDmaap\": \"Success\"}";
@@ -61,14 +60,14 @@ class DMaaPConsumerReactiveHttpClientTest {
when(consumerConfigurationMock.dmaapHostName()).thenReturn("54.45.33.2");
when(consumerConfigurationMock.dmaapProtocol()).thenReturn("https");
when(consumerConfigurationMock.dmaapPortNumber()).thenReturn(1234);
- when(consumerConfigurationMock.dmaapUserName()).thenReturn("Datafile");
- when(consumerConfigurationMock.dmaapUserPassword()).thenReturn("Datafile");
+ when(consumerConfigurationMock.dmaapUserName()).thenReturn("DATAFILE");
+ when(consumerConfigurationMock.dmaapUserPassword()).thenReturn("DATFILE");
when(consumerConfigurationMock.dmaapContentType()).thenReturn("application/json");
- when(consumerConfigurationMock.dmaapTopicName()).thenReturn("unauthenticated.SEC_OTHER_OUTPUT");
+ when(consumerConfigurationMock.dmaapTopicName()).thenReturn("unauthenticated.VES_NOTIFICATION_OUTPUT");
when(consumerConfigurationMock.consumerGroup()).thenReturn("OpenDCAE-c12");
when(consumerConfigurationMock.consumerId()).thenReturn("c12");
- dmaapConsumerReactiveHttpClient = new DMaaPConsumerReactiveHttpClient(consumerConfigurationMock);
+ dmaapConsumerReactiveHttpClient = new DmaapConsumerReactiveHttpClient(consumerConfigurationMock);
webClient = spy(WebClient.builder()
.defaultHeader(HttpHeaders.CONTENT_TYPE, consumerConfigurationMock.dmaapContentType())
.filter(basicAuthentication(consumerConfigurationMock.dmaapUserName(),
@@ -87,8 +86,8 @@ class DMaaPConsumerReactiveHttpClientTest {
//when
mockDependantObjects();
doReturn(expectedResult).when(responseSpec).bodyToMono(String.class);
- dmaapConsumerReactiveHttpClient.createDMaaPWebClient(webClient);
- Mono<String> response = dmaapConsumerReactiveHttpClient.getDMaaPConsumerResponse();
+ dmaapConsumerReactiveHttpClient.createDmaapWebClient(webClient);
+ Mono<String> response = dmaapConsumerReactiveHttpClient.getDmaapConsumerResponse();
//then
StepVerifier.create(response).expectSubscription()
@@ -104,11 +103,11 @@ class DMaaPConsumerReactiveHttpClientTest {
dmaapConsumerReactiveHttpClient = spy(dmaapConsumerReactiveHttpClient);
//when
when(webClient.get()).thenReturn(requestHeadersSpec);
- dmaapConsumerReactiveHttpClient.createDMaaPWebClient(webClient);
+ dmaapConsumerReactiveHttpClient.createDmaapWebClient(webClient);
when(dmaapConsumerReactiveHttpClient.getUri()).thenThrow(URISyntaxException.class);
//then
- StepVerifier.create(dmaapConsumerReactiveHttpClient.getDMaaPConsumerResponse()).expectSubscription()
+ StepVerifier.create(dmaapConsumerReactiveHttpClient.getDmaapConsumerResponse()).expectSubscription()
.expectError(Exception.class).verify();
}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClientTest.java
deleted file mode 100644
index bb1ce19d..00000000
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClientTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Datafile Collector Service
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.service.producer;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModelForUnitTest;
-import org.onap.dcaegen2.collectors.datafile.service.producer.DMaaPProducerReactiveHttpClient;
-import org.springframework.http.HttpHeaders;
-import org.springframework.web.reactive.function.client.WebClient;
-import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
-import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
-import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
-import reactor.core.publisher.Mono;
-import reactor.test.StepVerifier;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
- */
-class DMaaPProducerReactiveHttpClientTest {
-
- private DMaaPProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
-
- private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock(
- DmaapPublisherConfiguration.class);
- private ConsumerDmaapModel consumerDmaapModel = new ConsumerDmaapModelForUnitTest();
- private WebClient webClient = mock(WebClient.class);
- private RequestBodyUriSpec requestBodyUriSpec;
- private ResponseSpec responseSpec;
-
-
- @BeforeEach
- void setUp() {
- when(dmaapPublisherConfigurationMock.dmaapHostName()).thenReturn("54.45.33.2");
- when(dmaapPublisherConfigurationMock.dmaapProtocol()).thenReturn("https");
- when(dmaapPublisherConfigurationMock.dmaapPortNumber()).thenReturn(1234);
- when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("Datafile");
- when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("Datafile");
- when(dmaapPublisherConfigurationMock.dmaapContentType()).thenReturn("application/json");
- when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn("pnfReady");
-
- dmaapProducerReactiveHttpClient = new DMaaPProducerReactiveHttpClient(dmaapPublisherConfigurationMock);
-
- webClient = spy(WebClient.builder()
- .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaapPublisherConfigurationMock.dmaapContentType())
- .filter(basicAuthentication(dmaapPublisherConfigurationMock.dmaapUserName(),
- dmaapPublisherConfigurationMock.dmaapUserPassword()))
- .build());
- requestBodyUriSpec = mock(RequestBodyUriSpec.class);
- responseSpec = mock(ResponseSpec.class);
- }
-
- @Test
- void getHttpResponse_Success() {
- //given
- Integer responseSuccess = 200;
- Mono<Integer> expectedResult = Mono.just(responseSuccess);
-
- //when
- mockWebClientDependantObject();
- doReturn(expectedResult).when(responseSpec).bodyToMono(String.class);
- dmaapProducerReactiveHttpClient.createDMaaPWebClient(webClient);
- Mono<String> response = dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(Mono.just(consumerDmaapModel));
-
- //then
- Assertions.assertEquals(response.block(), expectedResult.block());
- }
-
- @Test
- void getHttpResponse_whenUriSyntaxExceptionHasBeenThrown() throws URISyntaxException {
- //given
- dmaapProducerReactiveHttpClient = spy(dmaapProducerReactiveHttpClient);
- //when
- when(webClient.post()).thenReturn(requestBodyUriSpec);
- dmaapProducerReactiveHttpClient.createDMaaPWebClient(webClient);
- when(dmaapProducerReactiveHttpClient.getUri()).thenThrow(URISyntaxException.class);
-
- //then
- StepVerifier.create(dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(any())).expectSubscription()
- .expectError(Exception.class).verify();
- }
-
- private void mockWebClientDependantObject() {
- RequestHeadersSpec requestHeadersSpec = mock(RequestHeadersSpec.class);
- when(webClient.post()).thenReturn(requestBodyUriSpec);
- when(requestBodyUriSpec.uri((URI) any())).thenReturn(requestBodyUriSpec);
- when(requestBodyUriSpec.body(any())).thenReturn(requestHeadersSpec);
- doReturn(responseSpec).when(requestHeadersSpec).retrieve();
- doReturn(responseSpec).when(responseSpec).onStatus(any(), any());
- }
-} \ No newline at end of file
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
new file mode 100644
index 00000000..213e8d77
--- /dev/null
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
@@ -0,0 +1,129 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service.producer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.http.client.utils.URIBuilder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
+import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModelForUnitTest;
+import org.springframework.http.HttpHeaders;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
+import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
+ */
+class DmaapProducerReactiveHttpClientTest {
+
+ private static final String LOCATION = "location";
+ private static final String X_ATT_DR_META = "X-ATT-DR-META";
+
+ private static final String HOST = "54.45.33.2";
+ private static final String HTTPS_SCHEME = "https";
+ private static final int PORT = 1234;
+ private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
+ private static final String FILE_READY_TOPIC = "fileReady";
+
+ private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
+
+ private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
+ private ConsumerDmaapModel consumerDmaapModel = new ConsumerDmaapModelForUnitTest();
+ private WebClient webClientMock = mock(WebClient.class);
+ private RequestBodyUriSpec requestBodyUriSpecMock;
+ private ResponseSpec responseSpecMock;
+
+
+ @BeforeEach
+ void setUp() {
+ when(dmaapPublisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
+ when(dmaapPublisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
+ when(dmaapPublisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
+ when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("DATAFILE");
+ when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("DATAFILE");
+ when(dmaapPublisherConfigurationMock.dmaapContentType()).thenReturn(APPLICATION_OCTET_STREAM_CONTENT_TYPE);
+ when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn(FILE_READY_TOPIC);
+
+ dmaapProducerReactiveHttpClient = new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock);
+
+ webClientMock = spy(WebClient.builder()
+ .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaapPublisherConfigurationMock.dmaapContentType())
+ .filter(basicAuthentication(dmaapPublisherConfigurationMock.dmaapUserName(),
+ dmaapPublisherConfigurationMock.dmaapUserPassword()))
+ .build());
+ requestBodyUriSpecMock = mock(RequestBodyUriSpec.class);
+ responseSpecMock = mock(ResponseSpec.class);
+ }
+
+ @Test
+ void getHttpResponse_Success() {
+ // given
+
+ // when
+ mockWebClientDependantObject();
+ dmaapProducerReactiveHttpClient.createDmaapWebClient(webClientMock);
+ List<ConsumerDmaapModel> consumerDmaapModelList = new ArrayList<ConsumerDmaapModel>();
+ consumerDmaapModelList.add(consumerDmaapModel);
+
+ dmaapProducerReactiveHttpClient.getDmaapProducerResponse(Mono.just(consumerDmaapModelList));
+
+ // then
+ verify(requestBodyUriSpecMock).header(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE);
+ JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel));
+ metaData.getAsJsonObject().remove(LOCATION);
+ verify(requestBodyUriSpecMock).header(X_ATT_DR_META, metaData.toString());
+ URI expectedUri = null;
+ try {
+ expectedUri = new URIBuilder().setScheme(HTTPS_SCHEME).setHost(HOST).setPort(1234).setPath(FILE_READY_TOPIC)
+ .build();
+ } catch (URISyntaxException e) {
+ // Nothing
+ }
+ verify(requestBodyUriSpecMock).uri(expectedUri);
+ verify(requestBodyUriSpecMock).body(any());
+ }
+
+ private void mockWebClientDependantObject() {
+ when(webClientMock.post()).thenReturn(requestBodyUriSpecMock);
+ when(requestBodyUriSpecMock.uri((URI) any())).thenReturn(requestBodyUriSpecMock);
+
+ when(requestBodyUriSpecMock.retrieve()).thenReturn(responseSpecMock);
+ when(responseSpecMock.onStatus(any(), any())).thenReturn(responseSpecMock);
+ Mono<String> expectedResult = Mono.just("200");
+ when(responseSpecMock.bodyToMono(String.class)).thenReturn(expectedResult);
+ }
+}