summaryrefslogtreecommitdiffstats
path: root/datafile-dmaap-client/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-dmaap-client/src/main')
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java21
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java10
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java17
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java (renamed from datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DMaaPReactiveWebClient.java)20
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java30
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java (renamed from datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DMaaPConsumerReactiveHttpClient.java)26
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClient.java96
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java151
8 files changed, 222 insertions, 149 deletions
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java
index 57b11127..dd7519f9 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java
@@ -1,9 +1,7 @@
/*
- * ============LICENSE_START=======================================================
- * Datafile Collector Service
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -15,14 +13,13 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END=========================================================
+ * ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.config;
import org.immutables.gson.Gson;
import org.immutables.value.Value;
-import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration;
import org.springframework.stereotype.Component;
/**
@@ -43,20 +40,20 @@ public abstract class DmaapConsumerConfiguration implements DmaapCustomConfig {
public abstract String consumerGroup();
@Value.Parameter
- public abstract Integer timeoutMs();
+ public abstract Integer timeoutMS();
@Value.Parameter
public abstract Integer messageLimit();
- public interface Builder extends
- DmaapCustomConfig.Builder<DmaapConsumerConfiguration, DmaapConsumerConfiguration.Builder> {
+ public interface Builder
+ extends DmaapCustomConfig.Builder<DmaapConsumerConfiguration, DmaapConsumerConfiguration.Builder> {
Builder consumerId(String consumerId);
Builder consumerGroup(String consumerGroup);
- Builder timeoutMs(Integer timeoutMs);
+ Builder timeoutMS(Integer timeoutMS);
Builder messageLimit(Integer messageLimit);
}
@@ -65,4 +62,4 @@ public abstract class DmaapConsumerConfiguration implements DmaapCustomConfig {
return ImmutableDmaapConsumerConfiguration.builder();
}
-} \ No newline at end of file
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java
index 31bbfc0e..0b1d99eb 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java
@@ -1,9 +1,7 @@
/*
- * ============LICENSE_START=======================================================
- * Datafile Collector Service
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -15,7 +13,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END=========================================================
+ * ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.config;
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java
index cd520569..d0918446 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java
@@ -1,9 +1,7 @@
/*
- * ============LICENSE_START=======================================================
- * Datafile Collector Service
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -15,16 +13,13 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END=========================================================
+ * ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.config;
import org.immutables.gson.Gson;
import org.immutables.value.Value;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapCustomConfig;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -36,8 +31,8 @@ public abstract class DmaapPublisherConfiguration implements DmaapCustomConfig {
private static final long serialVersionUID = 1L;
- interface Builder extends
- DmaapCustomConfig.Builder<DmaapPublisherConfiguration, DmaapPublisherConfiguration.Builder> {
+ interface Builder
+ extends DmaapCustomConfig.Builder<DmaapPublisherConfiguration, DmaapPublisherConfiguration.Builder> {
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DMaaPReactiveWebClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
index b4cbfeea..d5878b0d 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DMaaPReactiveWebClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
@@ -1,9 +1,7 @@
/*
- * ============LICENSE_START=======================================================
- * Datafile Collector Service
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -15,7 +13,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END=========================================================
+ * ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.service;
@@ -23,18 +21,18 @@ package org.onap.dcaegen2.collectors.datafile.service;
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
import org.onap.dcaegen2.collectors.datafile.config.DmaapCustomConfig;
-import org.onap.dcaegen2.collectors.datafile.service.DMaaPReactiveWebClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
+
import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
*/
-public class DMaaPReactiveWebClient {
+public class DmaapReactiveWebClient {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -43,12 +41,12 @@ public class DMaaPReactiveWebClient {
private String dmaaPUserPassword;
/**
- * Creating DMaaPReactiveWebClient passing to them basic DMaaPConfig.
+ * Creating DmaapReactiveWebClient passing to them basic DmaapConfig.
*
* @param dmaapCustomConfig - configuration object
- * @return DMaaPReactiveWebClient
+ * @return DmaapReactiveWebClient
*/
- public DMaaPReactiveWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) {
+ public DmaapReactiveWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) {
this.dmaaPUserName = dmaapCustomConfig.dmaapUserName();
this.dmaaPUserPassword = dmaapCustomConfig.dmaapUserPassword();
this.dmaaPContentType = dmaapCustomConfig.dmaapContentType();
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
new file mode 100644
index 00000000..2b44233f
--- /dev/null
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import org.apache.http.HttpStatus;
+
+public final class HttpUtils implements HttpStatus {
+
+ private HttpUtils() {}
+
+ public static boolean isSuccessfulResponseCode(Integer statusCode) {
+ return statusCode >= 200 && statusCode < 300;
+ }
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DMaaPConsumerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java
index 1fcebeac..ad9e6fe7 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DMaaPConsumerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java
@@ -1,9 +1,7 @@
/*
- * ============LICENSE_START=======================================================
- * Datafile Collector Service
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -15,26 +13,28 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END=========================================================
+ * ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.service.consumer;
import java.net.URI;
import java.net.URISyntaxException;
+
import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.collectors.datafile.service.consumer.DMaaPConsumerReactiveHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.client.WebClient;
+
import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/26/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-public class DMaaPConsumerReactiveHttpClient {
+public class DmaapConsumerReactiveHttpClient {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -47,11 +47,11 @@ public class DMaaPConsumerReactiveHttpClient {
private final String consumerId;
/**
- * Constructor of DMaaPConsumerReactiveHttpClient.
+ * Constructor of DmaapConsumerReactiveHttpClient.
*
* @param consumerConfiguration - DMaaP consumer configuration object
*/
- public DMaaPConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) {
+ public DmaapConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) {
this.dmaapHostName = consumerConfiguration.dmaapHostName();
this.dmaapProtocol = consumerConfiguration.dmaapProtocol();
this.dmaapPortNumber = consumerConfiguration.dmaapPortNumber();
@@ -65,7 +65,7 @@ public class DMaaPConsumerReactiveHttpClient {
*
* @return reactive response from DMaaP in string format
*/
- public Mono<String> getDMaaPConsumerResponse() {
+ public Mono<String> getDmaapConsumerResponse() {
try {
return webClient
.get()
@@ -78,7 +78,7 @@ public class DMaaPConsumerReactiveHttpClient {
Mono.error(new Exception("HTTP 500")))
.bodyToMono(String.class);
} catch (URISyntaxException e) {
- logger.warn("Exception while evaluating URI ");
+ logger.error("Unable to parse URI in message from xNF.", e);
return Mono.error(e);
}
}
@@ -87,7 +87,7 @@ public class DMaaPConsumerReactiveHttpClient {
return dmaapTopicName + "/" + consumerGroup + "/" + consumerId;
}
- public DMaaPConsumerReactiveHttpClient createDMaaPWebClient(WebClient webClient) {
+ public DmaapConsumerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
this.webClient = webClient;
return this;
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClient.java
deleted file mode 100644
index c6889df4..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClient.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Datafile Collector Service
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.service.producer;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import org.apache.http.client.utils.URIBuilder;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.producer.DMaaPProducerReactiveHttpClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.reactive.function.BodyInserters;
-import org.springframework.web.reactive.function.client.WebClient;
-import reactor.core.publisher.Mono;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
- */
-public class DMaaPProducerReactiveHttpClient {
-
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
- private WebClient webClient;
- private final String dmaapHostName;
- private final Integer dmaapPortNumber;
- private final String dmaapProtocol;
- private final String dmaapTopicName;
-
- /**
- * Constructor DMaaPProducerReactiveHttpClient.
- *
- * @param dmaapPublisherConfiguration - DMaaP producer configuration object
- */
- public DMaaPProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
- this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
- this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
- this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
- this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
- }
-
- /**
- * Function for calling DMaaP HTTP producer - post request to DMaaP.
- *
- * @param consumerDmaapModelMono - object which will be sent to DMaaP
- * @return status code of operation
- */
- public Mono<String> getDMaaPProducerResponse(Mono<ConsumerDmaapModel> consumerDmaapModelMono) {
- try {
- return webClient
- .post()
- .uri(getUri())
- .body(BodyInserters.fromObject(consumerDmaapModelMono))
- .retrieve()
- .onStatus(HttpStatus::is4xxClientError, clientResponse ->
- Mono.error(new Exception("HTTP 400"))
- )
- .onStatus(HttpStatus::is5xxServerError, clientResponse ->
- Mono.error(new Exception("HTTP 500")))
- .bodyToMono(String.class);
- } catch (URISyntaxException e) {
- logger.warn("Exception while evaluating URI");
- return Mono.error(e);
- }
- }
-
- public DMaaPProducerReactiveHttpClient createDMaaPWebClient(WebClient webClient) {
- this.webClient = webClient;
- return this;
- }
-
- URI getUri() throws URISyntaxException {
- return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
- .setPath(dmaapTopicName).build();
- }
-
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
new file mode 100644
index 00000000..8010bdc1
--- /dev/null
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -0,0 +1,151 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service.producer;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.utils.URIBuilder;
+import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
+import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.FileSystemResource;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.reactive.function.BodyInserters;
+import org.springframework.web.reactive.function.client.ClientResponse;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
+import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public class DmaapProducerReactiveHttpClient {
+
+ private static final String X_ATT_DR_META = "X-ATT-DR-META";
+ private static final String LOCATION = "location";
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private WebClient webClient;
+ private final String dmaapHostName;
+ private final Integer dmaapPortNumber;
+ private final String dmaapProtocol;
+ private final String dmaapTopicName;
+ private final String dmaapContentType;
+
+ /**
+ * Constructor DmaapProducerReactiveHttpClient.
+ *
+ * @param dmaapPublisherConfiguration - DMaaP producer configuration object
+ */
+ public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
+
+ this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
+ this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
+ this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
+ this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
+ this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
+ }
+
+ /**
+ * Function for calling DMaaP HTTP producer - post request to DMaaP.
+ *
+ * @param consumerDmaapModelMono - object which will be sent to DMaaP
+ * @return status code of operation
+ */
+ public Mono<String> getDmaapProducerResponse(Mono<List<ConsumerDmaapModel>> consumerDmaapModelMono) {
+ consumerDmaapModelMono.subscribe(models -> postFilesAndData(models));
+ return Mono.just(HttpStatus.OK.toString());
+ }
+
+ public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
+ this.webClient = webClient;
+ return this;
+ }
+
+ private void postFilesAndData(List<ConsumerDmaapModel> models) {
+ for (ConsumerDmaapModel consumerDmaapModel : models) {
+ postFileAndData(consumerDmaapModel);
+ }
+ }
+
+ private void postFileAndData(ConsumerDmaapModel model) {
+ RequestBodyUriSpec post = webClient.post();
+
+ boolean headPrepared = prepareHead(model, post);
+
+ if (headPrepared) {
+ prepareBody(model, post);
+
+ ResponseSpec responseSpec = post.retrieve();
+ responseSpec.onStatus(HttpStatus::is4xxClientError,
+ clientResponse -> handlePostErrors(model, clientResponse));
+ responseSpec.onStatus(HttpStatus::is5xxServerError,
+ clientResponse -> handlePostErrors(model, clientResponse));
+ String bodyToMono = responseSpec.bodyToMono(String.class).block();
+ }
+ }
+
+ private boolean prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) {
+ boolean result = true;
+ try {
+ post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType);
+
+ JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
+ metaData.getAsJsonObject().remove(LOCATION);
+ post.header(X_ATT_DR_META, metaData.toString());
+
+ post.uri(getUri());
+ } catch (Exception e) {
+ logger.error("Unable to post file to Data Router. " + model, e);
+ result = false;
+ }
+
+ return result;
+ }
+
+ private void prepareBody(ConsumerDmaapModel model, RequestBodyUriSpec post) {
+ String fileLocation = model.getLocation();
+ File fileResource = new File(fileLocation);
+ FileSystemResource httpResource = new FileSystemResource(fileResource);
+ post.body(BodyInserters.fromResource(httpResource));
+ }
+
+ private URI getUri() throws URISyntaxException {
+ return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
+ .setPath(dmaapTopicName).build();
+ }
+
+ private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) {
+ String errorMessage = "Unable to post file to Data Router. " + model + "Reason: " + clientResponse.toString();
+ logger.error(errorMessage);
+
+ return Mono.error(new Exception(errorMessage));
+ }
+}