diff options
Diffstat (limited to 'datafile-dmaap-client/src/main')
8 files changed, 222 insertions, 149 deletions
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java index 57b11127..dd7519f9 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java @@ -1,9 +1,7 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,14 +13,13 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.config; import org.immutables.gson.Gson; import org.immutables.value.Value; -import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration; import org.springframework.stereotype.Component; /** @@ -43,20 +40,20 @@ public abstract class DmaapConsumerConfiguration implements DmaapCustomConfig { public abstract String consumerGroup(); @Value.Parameter - public abstract Integer timeoutMs(); + public abstract Integer timeoutMS(); @Value.Parameter public abstract Integer messageLimit(); - public interface Builder extends - DmaapCustomConfig.Builder<DmaapConsumerConfiguration, DmaapConsumerConfiguration.Builder> { + public interface Builder + extends DmaapCustomConfig.Builder<DmaapConsumerConfiguration, DmaapConsumerConfiguration.Builder> { Builder consumerId(String consumerId); Builder consumerGroup(String consumerGroup); - Builder timeoutMs(Integer timeoutMs); + Builder timeoutMS(Integer timeoutMS); Builder messageLimit(Integer messageLimit); } @@ -65,4 +62,4 @@ public abstract class DmaapConsumerConfiguration implements DmaapCustomConfig { return ImmutableDmaapConsumerConfiguration.builder(); } -}
\ No newline at end of file +} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java index 31bbfc0e..0b1d99eb 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java @@ -1,9 +1,7 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,7 +13,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.config; diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java index cd520569..d0918446 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java @@ -1,9 +1,7 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,16 +13,13 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.config; import org.immutables.gson.Gson; import org.immutables.value.Value; -import org.onap.dcaegen2.collectors.datafile.config.DmaapCustomConfig; -import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -36,8 +31,8 @@ public abstract class DmaapPublisherConfiguration implements DmaapCustomConfig { private static final long serialVersionUID = 1L; - interface Builder extends - DmaapCustomConfig.Builder<DmaapPublisherConfiguration, DmaapPublisherConfiguration.Builder> { + interface Builder + extends DmaapCustomConfig.Builder<DmaapPublisherConfiguration, DmaapPublisherConfiguration.Builder> { } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DMaaPReactiveWebClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java index b4cbfeea..d5878b0d 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DMaaPReactiveWebClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java @@ -1,9 +1,7 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,7 +13,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.service; @@ -23,18 +21,18 @@ package org.onap.dcaegen2.collectors.datafile.service; import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication; import org.onap.dcaegen2.collectors.datafile.config.DmaapCustomConfig; -import org.onap.dcaegen2.collectors.datafile.service.DMaaPReactiveWebClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpHeaders; import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.WebClient; + import reactor.core.publisher.Mono; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18 */ -public class DMaaPReactiveWebClient { +public class DmaapReactiveWebClient { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -43,12 +41,12 @@ public class DMaaPReactiveWebClient { private String dmaaPUserPassword; /** - * Creating DMaaPReactiveWebClient passing to them basic DMaaPConfig. + * Creating DmaapReactiveWebClient passing to them basic DmaapConfig. * * @param dmaapCustomConfig - configuration object - * @return DMaaPReactiveWebClient + * @return DmaapReactiveWebClient */ - public DMaaPReactiveWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) { + public DmaapReactiveWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) { this.dmaaPUserName = dmaapCustomConfig.dmaapUserName(); this.dmaaPUserPassword = dmaapCustomConfig.dmaapUserPassword(); this.dmaaPContentType = dmaapCustomConfig.dmaapContentType(); diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java new file mode 100644 index 00000000..2b44233f --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java @@ -0,0 +1,30 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.service; + +import org.apache.http.HttpStatus; + +public final class HttpUtils implements HttpStatus { + + private HttpUtils() {} + + public static boolean isSuccessfulResponseCode(Integer statusCode) { + return statusCode >= 200 && statusCode < 300; + } +} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DMaaPConsumerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java index 1fcebeac..ad9e6fe7 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DMaaPConsumerReactiveHttpClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java @@ -1,9 +1,7 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,26 +13,28 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.service.consumer; import java.net.URI; import java.net.URISyntaxException; + import org.apache.http.client.utils.URIBuilder; import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.collectors.datafile.service.consumer.DMaaPConsumerReactiveHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpStatus; import org.springframework.web.reactive.function.client.WebClient; + import reactor.core.publisher.Mono; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/26/18 + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -public class DMaaPConsumerReactiveHttpClient { +public class DmaapConsumerReactiveHttpClient { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -47,11 +47,11 @@ public class DMaaPConsumerReactiveHttpClient { private final String consumerId; /** - * Constructor of DMaaPConsumerReactiveHttpClient. + * Constructor of DmaapConsumerReactiveHttpClient. * * @param consumerConfiguration - DMaaP consumer configuration object */ - public DMaaPConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) { + public DmaapConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) { this.dmaapHostName = consumerConfiguration.dmaapHostName(); this.dmaapProtocol = consumerConfiguration.dmaapProtocol(); this.dmaapPortNumber = consumerConfiguration.dmaapPortNumber(); @@ -65,7 +65,7 @@ public class DMaaPConsumerReactiveHttpClient { * * @return reactive response from DMaaP in string format */ - public Mono<String> getDMaaPConsumerResponse() { + public Mono<String> getDmaapConsumerResponse() { try { return webClient .get() @@ -78,7 +78,7 @@ public class DMaaPConsumerReactiveHttpClient { Mono.error(new Exception("HTTP 500"))) .bodyToMono(String.class); } catch (URISyntaxException e) { - logger.warn("Exception while evaluating URI "); + logger.error("Unable to parse URI in message from xNF.", e); return Mono.error(e); } } @@ -87,7 +87,7 @@ public class DMaaPConsumerReactiveHttpClient { return dmaapTopicName + "/" + consumerGroup + "/" + consumerId; } - public DMaaPConsumerReactiveHttpClient createDMaaPWebClient(WebClient webClient) { + public DmaapConsumerReactiveHttpClient createDmaapWebClient(WebClient webClient) { this.webClient = webClient; return this; } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClient.java deleted file mode 100644 index c6889df4..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClient.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcaegen2.collectors.datafile.service.producer; - -import java.net.URI; -import java.net.URISyntaxException; -import org.apache.http.client.utils.URIBuilder; -import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.producer.DMaaPProducerReactiveHttpClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.http.HttpStatus; -import org.springframework.web.reactive.function.BodyInserters; -import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.Mono; - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18 - */ -public class DMaaPProducerReactiveHttpClient { - - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - - private WebClient webClient; - private final String dmaapHostName; - private final Integer dmaapPortNumber; - private final String dmaapProtocol; - private final String dmaapTopicName; - - /** - * Constructor DMaaPProducerReactiveHttpClient. - * - * @param dmaapPublisherConfiguration - DMaaP producer configuration object - */ - public DMaaPProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) { - this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName(); - this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol(); - this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber(); - this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName(); - } - - /** - * Function for calling DMaaP HTTP producer - post request to DMaaP. - * - * @param consumerDmaapModelMono - object which will be sent to DMaaP - * @return status code of operation - */ - public Mono<String> getDMaaPProducerResponse(Mono<ConsumerDmaapModel> consumerDmaapModelMono) { - try { - return webClient - .post() - .uri(getUri()) - .body(BodyInserters.fromObject(consumerDmaapModelMono)) - .retrieve() - .onStatus(HttpStatus::is4xxClientError, clientResponse -> - Mono.error(new Exception("HTTP 400")) - ) - .onStatus(HttpStatus::is5xxServerError, clientResponse -> - Mono.error(new Exception("HTTP 500"))) - .bodyToMono(String.class); - } catch (URISyntaxException e) { - logger.warn("Exception while evaluating URI"); - return Mono.error(e); - } - } - - public DMaaPProducerReactiveHttpClient createDMaaPWebClient(WebClient webClient) { - this.webClient = webClient; - return this; - } - - URI getUri() throws URISyntaxException { - return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber) - .setPath(dmaapTopicName).build(); - } - -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java new file mode 100644 index 00000000..8010bdc1 --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java @@ -0,0 +1,151 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.service.producer; + +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; + +import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; + +import org.apache.http.HttpHeaders; +import org.apache.http.client.utils.URIBuilder; +import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.FileSystemResource; +import org.springframework.http.HttpStatus; +import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec; +import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; + +import reactor.core.publisher.Mono; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18 + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + */ +public class DmaapProducerReactiveHttpClient { + + private static final String X_ATT_DR_META = "X-ATT-DR-META"; + private static final String LOCATION = "location"; + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private WebClient webClient; + private final String dmaapHostName; + private final Integer dmaapPortNumber; + private final String dmaapProtocol; + private final String dmaapTopicName; + private final String dmaapContentType; + + /** + * Constructor DmaapProducerReactiveHttpClient. + * + * @param dmaapPublisherConfiguration - DMaaP producer configuration object + */ + public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) { + + this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName(); + this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol(); + this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber(); + this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName(); + this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType(); + } + + /** + * Function for calling DMaaP HTTP producer - post request to DMaaP. + * + * @param consumerDmaapModelMono - object which will be sent to DMaaP + * @return status code of operation + */ + public Mono<String> getDmaapProducerResponse(Mono<List<ConsumerDmaapModel>> consumerDmaapModelMono) { + consumerDmaapModelMono.subscribe(models -> postFilesAndData(models)); + return Mono.just(HttpStatus.OK.toString()); + } + + public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) { + this.webClient = webClient; + return this; + } + + private void postFilesAndData(List<ConsumerDmaapModel> models) { + for (ConsumerDmaapModel consumerDmaapModel : models) { + postFileAndData(consumerDmaapModel); + } + } + + private void postFileAndData(ConsumerDmaapModel model) { + RequestBodyUriSpec post = webClient.post(); + + boolean headPrepared = prepareHead(model, post); + + if (headPrepared) { + prepareBody(model, post); + + ResponseSpec responseSpec = post.retrieve(); + responseSpec.onStatus(HttpStatus::is4xxClientError, + clientResponse -> handlePostErrors(model, clientResponse)); + responseSpec.onStatus(HttpStatus::is5xxServerError, + clientResponse -> handlePostErrors(model, clientResponse)); + String bodyToMono = responseSpec.bodyToMono(String.class).block(); + } + } + + private boolean prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) { + boolean result = true; + try { + post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType); + + JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model)); + metaData.getAsJsonObject().remove(LOCATION); + post.header(X_ATT_DR_META, metaData.toString()); + + post.uri(getUri()); + } catch (Exception e) { + logger.error("Unable to post file to Data Router. " + model, e); + result = false; + } + + return result; + } + + private void prepareBody(ConsumerDmaapModel model, RequestBodyUriSpec post) { + String fileLocation = model.getLocation(); + File fileResource = new File(fileLocation); + FileSystemResource httpResource = new FileSystemResource(fileResource); + post.body(BodyInserters.fromResource(httpResource)); + } + + private URI getUri() throws URISyntaxException { + return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber) + .setPath(dmaapTopicName).build(); + } + + private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) { + String errorMessage = "Unable to post file to Data Router. " + model + "Reason: " + clientResponse.toString(); + logger.error(errorMessage); + + return Mono.error(new Exception(errorMessage)); + } +} |